Solution: How to execute pure SQL files on Amazon EMR Serverless?

“Big Data Platform Architecture and Prototype Implementation: Practical Combat of Data Middle Platform Construction” that the blogger spent three years carefully creating The book has now been published by the well-known IT book brand Electronic Industry Press Bowen Viewpoint. Click on “Heavy Recommendation: Building a big data platform is too difficult!” Send me an engineering prototype! 》To learn more about the book, JD.com book purchase link: https://item.jd.com/12677623.html, scan the QR code on the left to enter the JD.com mobile book purchase page.

For a long time, SQL has been the preferred programming language for ETL due to its simplicity, ease of use, and high development efficiency. It plays an irreplaceable role in the process of building data warehouses and data lakes. It is precisely because of this that Hive and Spark SQL firmly occupy the main position in today’s big data ecosystem. In a regular Spark environment, developers can use the spark-sql command to directly execute SQL files. This is a seemingly ordinary but actually very important function: on the one hand, this method greatly It lowers the threshold for using Spark. Users can use Spark as long as they can write SQL. On the other hand, driving the execution of SQL files through the command line can greatly simplify the submission of SQL jobs, making the job submission itself “coded”. Large-scale engineering development and automated deployment are facilitated.

Unfortunately, Amazon EMR Serverless fails to provide native support for executing SQL files. Users can only embed SQL statements in Scala/Python code, which is not friendly to users who rely on pure SQL to develop data warehouses or data lakes. To this end, we have specially developed a set of tool classes for reading, parsing, and executing SQL files. With this set of tool classes, users can directly execute SQL files on Amazon EMR Serverless. This article will introduce this solution in detail.

1. Scheme design

Since the method of executing SQL statements in the Spark programming environment is: spark.sql("..."), we can design a general job class that will be started based on the incoming The parameter reads the SQL file at the specified location, then splits it into a single SQL and calls spark.sql("...") for execution. In order to make the job class more flexible and versatile, wildcards can also be introduced to load and execute multiple SQL files at one time. In addition, ETL jobs often need to execute corresponding batches based on the time parameters generated by the job scheduling tool. These parameters will also be applied to SQL. Therefore, the job class should also allow users to embed custom variables in SQL files and submit them Assign values to custom variables in the form of parameters during operation. Based on this design idea, we developed a project to achieve the above functions. The project address is:

Project name Project address
Amazon EMR Serverless Utilities https://github.com/bluishglc/emr-serverless-utils

The com.github.emr.serverless.SparkSqlJob class in the project is a general SQL job class. This class accepts two optional parameters, which are:

Parameter Description Value example
–sql-files Specify the path of the SQL file to be executed, supporting Java file system Wildcard, you can specify multiple files to be executed together s3://my-spark-sql-job/sqls/insert-into-*.sql
–sql-params is a SQL file in the form of K1=V1,K2=V2,... Variable settings in the form of ${K1}, ${K2},… CUST_CITY=NEW YORK, ORD_DATE=2008-07-15

The program has the following characteristics:

① Allow a single SQL file to contain multiple SQL statements
② It is allowed to define variables in the form of ${K1}, ${K2},… in the SQL file, and use K1=V1, when executing the job. K2=V2,... parameters in the form of variable assignment
③ Supports Java file system wildcards and can execute multiple SQL files at one time

Below, we will introduce and demonstrate how to use the tool classes of this project to submit pure SQL jobs in the AWS console and command line environments.

2. Practical demonstration

2.1. Environment preparation

When submitting a job on EMR Serverless, you need to prepare an “EMR Serverless Application” and an “EMR Serverless Job Execution Role”, of which the latter should have read and write permissions for S3 and Glue Data Catalog. The Application can be easily created through a wizard on the EMR Serverless console (EMR Studio) (all default configurations are enough), and the Execution Role can be quickly created using the script provided in Section 5 of the article “CDC One-click Lake Entry: When Apache Hudi DeltaStreamer Meets Serverless Spark” create.

Next, prepare the Jar package and SQL files required to submit the job. First create a storage bucket on S3. The bucket name used in this article is: my-spark-sql-job (please pay attention to replacing the bucket name when you operate in your own environment), and then from [ Here] Download the compiled emr-serverless-utils.jar package and upload it to the s3://my-spark-sql-job/jars/ directory:

Please add image description

Five SQL sample files will also be used during the demonstration. Download and decompress them from [here] and upload them to the s3://my-spark-sql-job/sqls/ directory:

Please add image description

2.2. Submit a pure SQL file job on the console

2.2.1. Execute a single SQL file

Open the EMR Serverless console (EMR Studio) and submit a job as follows under the selected EMR Serverless Application:

Please add image description

① Script location: Set to the previously uploaded Jar package path s3://my-spark-sql-job/jars/emr-serverless-utils-1.0.jar
② Main class: Set to com.github.emr.serverless.SparkSqlJob
③ Script arguments: Set to ["--sql-files","s3://my-spark-sql-job/sqls/drop-tables.sql"]

As for other options, no special settings are required, just keep the default configuration. For jobs deployed in the production environment, you can flexibly configure them based on your own job needs, such as Spark Driver/Executor resource allocation, etc. Need to be reminded: Jobs created through the console will enable Glue Data Catalog by default (i.e.: Additional settings -> Metastore configuration -> Use AWS Glue Data Catalog is checked by default). In order to facilitate checking of SQL scripts in Glue and Athena It is recommended that you do not modify this default configuration.

The above configuration describes such a job: com.github.emr in s3://my-spark-sql-job/jars/emr-serverless-utils-1.0.jar .serverless.SparkSqlJob serves as the main class to launch a Spark job. Among them, ["--sql-files","s3://my-spark-sql-job/sqls/drop-tables.sql"] is passed to SparkSqlJob Parameters used to inform the location of the SQL file to be executed by the job. The SQL file executed in this job has only three simple DROP TABLE statements. It is a basic example to demonstrate the tool’s ability to execute multiple SQL statements in a single file.

2.2.2. Execute SQL files with custom parameters

What we want to demonstrate next is the second function of the tool class: executing SQL files with custom parameters. Create a new job or directly copy the previous job (select the previous job on the console, click Actions -> Clone job), and then set the value of “Script arguments” to:

["--sql-files","s3://my-spark-sql-job/sqls/create-tables.sql","--sql-params","APP_S3_HOME=s3://my -spark-sql-job"]

As shown below:

Please add image description

In addition to specifying SQL files using the --sql-files parameter, this job setting also uses the --sql-params parameter to customize user-defined features that appear in SQL. The variable is assigned a value. According to the previous introduction, APP_S3_HOME=s3://my-spark-sql-job is a “Key=Value” string, which means changing the value s3://my-spark -sql-job is assigned to the variable APP_S3_HOME, and all occurrences of ${APP_S3_HOME} in SQL will be s3://my-spark -sql-job replaced. Check the create-tables.sql file and you can find the custom variable ${APP_S3_HOME} in the LOCATION part of the table creation statement:

CREATE EXTERNAL TABLE IF NOT EXISTS ORDERS (
    ...
)
...
LOCATION '${APP_S3_HOME}/data/orders/';

When SparkSqlJob reads the SQL file, it will save all < Replace code>${APP_S3_HOME} with s3://my-spark-sql-job, and the actual executed SQL will become:

CREATE EXTERNAL TABLE IF NOT EXISTS ORDERS (
    ...
)
...
LOCATION 's3://my-spark-sql-job/data/orders/';

After the job is submitted and executed, you can log in to the Athena console to check whether the data table is created successfully.

2.2.3. Execute multiple files using wildcards

Sometimes, we need to batch execute all SQL files in a folder, or use wildcards to selectively execute some SQL files. SparkSqlJob uses Java file system wildcards to support such requirements. The following job demonstrates the use of wildcards. It is also to create a new job or directly copy the previous job, and then set the value of “Script arguments” to:

["--sql-files","s3://my-spark-sql-job/sqls/insert-into-*.sql"]

As shown below:

Please add image description

The --sql-files parameter of this job uses path wildcards, insert-into-*.sql will also match insert-into-orders.sql and insert-into-customers.sql two SQL files, they will respectively insert multiple entries into the two tables ORDERS and CUSTOMERS Record. After the execution is completed, you can log in to the Athena console to check whether there is data in the data table.

2.2.4. A composite example

Finally, let’s submit a more representative composite example: file wildcard + user-defined parameters. Create a new job again or copy the previous job directly, and then set the value of “Script arguments” to:

["--sql-files","s3://my-spark-sql-job/sqls/select-*.sql","--sql-params","APP_S3_HOME=s3://my -spark-sql-job,CUST_CITY=NEW YORK,ORD_DATE=2008-07-15"]

As shown below:

![emr-serverless-snapshot-4.jpg-150.8kB][6]

The --sql-files parameter of this job uses the path wildcard select-*.sql to match the select-tables.sql file. There are three user-defined variables, namely ${APP_S3_HOME}, ${CUST_CITY}, and ${ORD_DATE}:

CREATE EXTERNAL TABLE ORDERS_CUSTOMERS
    ...
    LOCATION '${APP_S3_HOME}/data/orders_customers/'
AS SELECT
    ...
WHERE
    C.CUST_CITY = '${CUST_CITY}' AND
    O.ORD_DATE = CAST('${ORD_DATE}' AS DATE);

The --sql-params parameter sets values for these three custom variables, which are: APP_S3_HOME=s3://my-spark-sql-job, CUST_CITY=NEW YORK, ORD_DATE=2008-07-15, then the above SQL will be converted into the following content for execution:

CREATE EXTERNAL TABLE ORDERS_CUSTOMERS
    ...
    LOCATION 's3://my-spark-sql-job/data/orders_customers/'
AS SELECT
    ...
WHERE
    C. CUST_CITY = 'NEW YORK' AND
    O.ORD_DATE = CAST('2008-07-15' AS DATE);

At this point, all functions of submitting pure SQL file jobs through the console have been demonstrated.

2.3. Submit pure SQL file jobs through the command line

In fact, many EMR Serverless users do not submit their jobs on the console, but through the AWS CLI. This method is more common in engineering code or job scheduling. So, let’s introduce how to submit a pure SQL file job through the command line.

This article’s method of using the command line to submit EMR Serverless jobs follows “Best Practices: How to Submit an Amazon EMR Serverless Job Elegantly?” “The best practices given in the article. First, log in to a Linux environment with AWS CLI installed and configured with user credentials (Amazon Linux2 is recommended), first use the command sudo yum -y install jq to install the command line tool for manipulating json files: jq ( It will be used in subsequent scripts), and then complete the following preliminary preparations:

① Create or select a job-specific working directory and S3 bucket
② Create or select an EMR Serverless Execution Role
③ Create or select an EMR Serverless Application

Next, export all environment-related variables (please replace the corresponding values in the command line according to your AWS account and local environment):

export APP_NAME='change-to-your-app-name'
export APP_S3_HOME='change-to-your-app-s3-home'
export APP_LOCAL_HOME='change-to-your-app-local-home'
export EMR_SERVERLESS_APP_ID='change-to-your-application-id'
export EMR_SERVERLESS_EXECUTION_ROLE_ARN='change-to-your-execution-role-arn'

Here is an example:

export APP_NAME='my-spark-sql-job'
export APP_S3_HOME='s3://my-spark-sql-job'
export APP_LOCAL_HOME='/home/ec2-user/my-spark-sql-job'
export EMR_SERVERLESS_APP_ID='00fbfel40ee59k09'
export EMR_SERVERLESS_EXECUTION_ROLE_ARN='arn:aws:iam::123456789000:role/EMR_SERVERLESS_ADMIN'

“Best Practices: How to Submit an Amazon EMR Serverless Job Elegantly?” “The article provides multiple general scripts for operating Jobs, which are very practical. This article will also reuse these scripts directly. However, since we need to submit multiple times and the parameters are different each time, for ease of use and simplification of the text, we Encapsulate part of the script in the original article into a Shell function and name it submit-spark-sql-job:

submit-spark-sql-job() {<!-- -->
    sqlFiles="$1"
    sqlParams="$2"
    cat << EOF > $APP_LOCAL_HOME/start-job-run.json
{
    "name": "my-spark-sql-job",
    "applicationId": "$EMR_SERVERLESS_APP_ID",
    "executionRoleArn":"$EMR_SERVERLESS_EXECUTION_ROLE_ARN",
    "jobDriver":{
        "sparkSubmit":{
        "entryPoint":"$APP_S3_HOME/jars/emr-serverless-utils-1.0.jar",
        "entryPointArguments":[
            $([[ -n "$sqlFiles" ]] & amp; & amp; echo ""--sql-files", "$sqlFiles"")
            $([[ -n "$sqlParams" ]] & amp; & amp; echo ","--sql-params", "$sqlParams"")
        ],
         "sparkSubmitParameters":"--class com.github.emr.serverless.SparkSqlJob --conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
        }
   },
   "configurationOverrides":{
        "monitoringConfiguration":{
            "s3MonitoringConfiguration":{
                "logUri": "$APP_S3_HOME/logs"
            }
        }
   }
}
EOF
    jq . $APP_LOCAL_HOME/start-job-run.json
    export EMR_SERVERLESS_JOB_RUN_ID=$(aws emr-serverless start-job-run \
        --no-paginate --no-cli-pager --output text \
        --name my-spark-sql-job \
        --application-id $EMR_SERVERLESS_APP_ID\
        --execution-role-arn $EMR_SERVERLESS_EXECUTION_ROLE_ARN \
        --execution-timeout-minutes 0 \
        --cli-input-json file://$APP_LOCAL_HOME/start-job-run.json \
        --query jobRunId)
    now=$(date + %s)sec
    while true; do
        jobStatus=$(aws emr-serverless get-job-run \
                        --no-paginate --no-cli-pager --output text \
                        --application-id $EMR_SERVERLESS_APP_ID \
                        --job-run-id $EMR_SERVERLESS_JOB_RUN_ID \
                        --query jobRun.state)
        if [ "$jobStatus" = "PENDING" ] || [ "$jobStatus" = "SCHEDULED" ] || [ "$jobStatus" = "RUNNING" ]; then
            for i in {<!-- -->0..5}; do
                echo -ne "\E[33;5m>>> The job [ $EMR_SERVERLESS_JOB_RUN_ID ] state is [ $jobStatus ], duration [ $(date -u --date now-$now + %H:%M:%S) ] ....\r\E[0m"
                sleep 1
            done
        else
            printf "The job [ $EMR_SERVERLESS_JOB_RUN_ID ] is [ $jobStatus ]Ps\
\
"
            break
        fi
    done
}

The function accepts two positional arguments:

① The parameter at the first position is used to specify the SQL file path, and its value will be passed to --sql-files of SparkSqlJob
② The parameter at the second position is used to specify the user-defined variable in the SQL file, and its value will be passed to --sql-params of SparkSqlJob

The Jar package and SQL file used in the function are the same as the Jar package and SQL file prepared in “2.1. Environment Preparation”, so the environment preparation in Section 2.1 also needs to be completed before using the script to submit the job. Next, we use this function to complete the same operation as Section 2.2.

2.3.1. Execute a single SQL file

The operations in this section are exactly the same as those in Section 2.2.1, except that the command line is used instead. The command is as follows:

submit-spark-sql-job "$APP_S3_HOME/sqls/drop-tables.sql"

2.3.2. Execute SQL files with custom parameters

The operations in this section are exactly the same as those in Section 2.2.2, except that the command line is used instead. The command is as follows:

submit-spark-sql-job "$APP_S3_HOME/sqls/create-tables.sql" "APP_S3_HOME=$APP_S3_HOME"

2.3.3. Use wildcards to execute multiple files

The operations in this section are exactly the same as those in Section 2.2.3, except that the command line is used instead. The command is as follows:

submit-spark-sql-job "$APP_S3_HOME/sqls/insert-into-*.sql"

2.3.4. A composite example

The operations in this section are exactly the same as those in Section 2.2.4, except that the command line is used instead. The command is as follows:

submit-spark-sql-job "$APP_S3_HOME/sqls/select-tables.sql" "APP_S3_HOME=$APP_S3_HOME,CUST_CITY=NEW YORK,ORD_DATE=2008-07-15"

3. Call tool classes in source code

Although the spark.sql(...) format can be used to directly execute SQL statements in the Spark programming environment, it can be seen from the previous examples that the SQL file execution capabilities provided by emr-serverless-utils are better. Convenience is also more powerful, so in the end, we briefly introduce how to call related tool classes in the source code to obtain the processing capabilities of the above SQL files. The specific method is very simple, you only need to:

① Load emr-serverless-utils-1.0.jar into your classpath
② Declare implicit type conversion
③ Directly call execSqlFile() on spark

#Initialize SparkSession and other operations
...

#Declare implicit type conversion
import com.github.emr.serverless.SparkSqlSupport._

# Call execSqlFile() directly on spark
spark.execSqlFile("s3://YOUR/XXX.sql")

# Call execSqlFile() directly on spark
spark.execSqlFile("s3://YOUR/XXX.sql", "K1=V1,K2=V2,...")

# Other operations
...