Apache Airflow (4): Airflow scheduling shell command

Personal homepage: IT Pindao_Big data OLAP system technology stack, Apache Doris, Clickhouse technology-CSDN blog

Private chat with bloggers: Join the big data technology discussion group chat to get more big data information.

Blogger’s personal B stack address: Brother Bao teaches you about big data’s personal space – Brother Bao teaches you about big data personal homepage – Bilibili Video

As mentioned above, the general steps for using Airflow for task scheduling are as follows:

  1. Create python files and use different Operators according to actual needs
  2. Pass in specific parameters in different Operators in the python file and define a series of tasks
  3. Define the relationship between Tasks in the python file to form a DAG
  4. Upload the python file for execution, schedule the DAG, and each task will form an Instance
  5. Use command line or WEBUI to view and manage

The above python file is the Airflow python script, which uses code to specify the structure of the DAG.

Let’s take the scheduling and execution of shell commands as an example to explain the use of Airflow.

1. First we need to create a python file and import the required class libraries

# Import the DAG object, which needs to be instantiated later.
from airflow import DAG

# Import BashOperator Operators, we need to use this object to execute the process
from airflow.operators.bash import BashOperator

Note: The above code can be created in the development tool, but the Airflow package needs to be imported and installed in the python3.7 environment used.

D:\ProgramData\Anaconda3\envs\python37\Scripts>pip install apache-airflow==2.1.3 -i https://pypi.tuna.tsinghua.edu.cn/simple</ pre>
<p><strong><strong>2. Instantiate DAG</strong></strong></p>
<pre>from datetime import datetime, timedelta

# Define some parameters in default_args, which can be used when instantiating DAG. They are defined in python dic format.
default_args = {
    'owner': 'airflow', # owner name
    'start_date': datetime(2021, 9, 4), # The time when execution starts for the first time, which is UTC time
    'retries': 1, # Number of failed retries
    'retry_delay': timedelta(minutes=5), # Failure retry interval
}

dag = DAG(
    dag_id = 'myairflow_execute_bash', #DAG id, must consist entirely of letters, numbers, and underscores
    default_args = default_args, #Externally defined parameters in dic format
    schedule_interval = timedelta(days=1) # Define the frequency of DAG running, you can configure days, weeks, hours, minutes, seconds, milliseconds
)

Notice:

  • There are three ways to instantiate DAG

First way:

with DAG("my_dag_name") as dag:
    op=XXOperator(task_id="task")

The second method (used above):

my_dag = DAG("my_dag_name")
op = XXOperator(task_id="task", dag=my_dag)

The third way:

@dag(start_date=days_ago(2))
def generate_dag():
    op = XXOperator(task_id="task")
dag = generate_dag()
  • baseoperator basic parameter description:

You can refer to:

http://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html#module-airflow.models.baseoperator View more parameters in baseopartor.

  • DAG parameter description

You can refer to:

http://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/dag/index.html View the DAG parameter description, or you can directly click DAG in the development tool to enter the source code to see what the corresponding parameters are. .

3. Define Task

When an Operator is instantiated, a Task task is generated. The process of instantiating an object from an Operator is called a constructor. Each constructor has “task_id” as the unique identifier of the task.

Next we define three Operators, that is, three Tasks. Each task_id cannot be repeated.

# operator supports multiple types, BashOperator is used here
first = BashOperator(
    task_id='first',
    bash_command='echo "run first task"',
    dag=dag
)

middle = BashOperator(
    task_id='middle',
    bash_command='echo "run middle task"',
    dag=dag
)

last = BashOperator(
    task_id='last',
    bash_command='echo "run last task"',
    dag=dag,
    retries=3
)

Notice:

  • Each operator can pass in corresponding parameters to override the default parameters of DAG. For example: “retries” = 3 in the last task replaces the default 1. The precedence rules for task parameters are as follows: ①. Display the passed parameters ②. The value that exists in the default_args dictionary ③. The default value of the operator (if it exists).
  • For how to use BashOperator, please refer to: http://airflow.apache.org/docs/apache-airflow/stable/howto/operator/bash.html# howto-operator-bashoperator

4. Set task dependencies

#Use set_upstream and set_downstream to set dependencies. Ring links cannot appear, otherwise an error will be reported.
# middle.set_upstream(first) # middle will be executed after first execution is completed
# last.set_upstream(middle) # last will be executed after middle execution is completed

#You can also use displacement characters to set dependencies
first >> middle >>last # first is executed first, middle second, last last
# first >> [middle, last] # first is executed first, middle and last are executed in parallel

Note: When executing the script, an exception will be thrown if a circular link (for example: A->B->C-A) is found in the DAG. For more DAG task dependencies, please refer to the official website: http://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#task-dependencies

5. Upload python configuration script

So far, the python configuration is as follows:

# Import the DAG object, which needs to be instantiated later.
from airflow import DAG

# Import BashOperator Operators, we need to use this object to execute the process
from airflow.example_dags.example_bash_operator import dag

from airflow.operators.bash import BashOperator

from datetime import datetime, timedelta

# Define some parameters in default_args, which can be used when instantiating DAG. They are defined in python dic format.
default_args = {
    'owner': 'airflow', # owner name
    'start_date': datetime(2021, 9, 4), # The time when execution starts for the first time, which is UTC time
    'retries': 1, # Number of failed retries
    'retry_delay': timedelta(minutes=5), # Failure retry interval
}

dag = DAG(
    dag_id = 'myairflow_execute_bash', #DAG id, must consist entirely of letters, numbers, and underscores
    default_args = default_args, #Externally defined parameters in dic format
    schedule_interval = timedelta(days=1) # Define the frequency of DAG running, you can configure days, weeks, hours, minutes, seconds, milliseconds
)

# operator supports multiple types, BashOperator is used here
first = BashOperator(
    task_id='first',
    bash_command='echo "run first task"',
    dag=dag
)

middle = BashOperator(
    task_id='middle',
    bash_command='echo "run middle task"',
    dag=dag
)

last = BashOperator(
    task_id='last',
    bash_command='echo "run last task"',
    dag=dag,
    retries=3
)

#Use set_upstream and set_downstream to set dependencies. Ring links cannot appear, otherwise an error will be reported.
# middle.set_upstream(first) # middle will be executed after first execution is completed
# last.set_upstream(middle) # last will be executed after middle execution is completed

#You can also use displacement characters to set dependencies
first >> middle >>last # first is executed first, middle second, last last
# first >> [middle, last] # first is executed first, middle and last are executed in parallel

Upload the above python configuration file to the $AIRFLOW_HOME/dags directory. By default, $AIRFLOW_HOME is the “/root/airflow” directory of the installation node. The dags directory in the current directory needs to be created manually.

6. Restart Airflow

“ps aux|grep webserver” and “ps aux|grep scheduler” find the corresponding airflow process, kill it, and restart Airflow. After restarting, you can see the corresponding DAG ID “myairflow_execute_bash” in airflow webui.

7. Execute airflow

Follow the steps below to execute DAG. First open the workflow, and then “Trigger DAG” to execute. Then you can see that the task is executed successfully.

View task execution log: