Big Data Scheduling Best Practices | Migrating from Airflow to Apache DolphinScheduler

Migration Background

Some users originally used Airflow as a scheduling system, but because Airflow can only define workflows through code and does not have granular division of resources and projects, it cannot be applied well in some scenarios that require strong permission control. In order to meet customer needs,so some users need to migrate the scheduling system from Airflow to Apache Dolphinscheduler.

file

Adhering to the perspective of solving the actual needs of users, Whaleops developed the Air2phin migration tool to help users better migrate to DolphinScheduler. Since Airflow defines workflow through python code, and some metadata information is only in the python code and will not be persisted to the database, we need to complete the analysis and migration steps by parsing the python code.

Why migrate to DolphinScheduler

Airflow and DolphinScheduler are both task scheduling systems and both solve the problem of task orchestration. Both have their own advantages. In this chapter we only introduce the advantages of DolphinScheduler over Airflow. We will describe the comparison of the two in a detailed comparison article in the future:

  1. Both are mature open source workflow scheduling systems and both have mature communities. The subtle difference is
    • Apache DolphinScheduler: Mainly based on visualization, supplemented by API, with more fine-grained permission management, more workflow levels, lower usage costs, and democratized data
    • Airflow: Define workflows with code. Writing workflows is for advanced research and development. It has high flexibility, but the cost of use is higher. It is basically for R&D personnel.
  2. DolphinScheduler Because workflow definitions, task definitions, and task relationships are all stored in the original database,
    • There is no additional operation when adding or removing master worker nodes. When airflow adds master and worker nodes, it needs to copy the dags file to the new node.
    • At the same time, because there is a process of parsing files to obtain workflows and tasks, there is no delay in adding or changing tasks, and naturally there is no need to sacrifice CPU performance in order to reduce delays. Airflow uses the loop method to discover and schedule DAGs, so the scheduler will consume more CPU resources during the loop.
    • It can retain complete historical workflow and task running status. airflow If some tasks are deleted in the latest definition, the historical status and logs of these tasks cannot be found
    • Native support version information. The DAGs definition of airflow needs to be found in git log, and revert also needs to go through git
  3. DolphinScheduler supports the resource center, making it easier for users to manage and organize local and remote resource files. If airflow has external resources, they generally need to be hosted in version control together with git.
  4. In addition to offline scheduling tasks, DolphinScheduler also supports practical functions related to scheduling such as real-time tasks, data, and monitoring of physical machine resources. Airflow is currently more focused on offline workflow scheduling.
  5. DolphinScheduler is a distributed, centerless system. The master’s server resource utilization is higher. Airflow scans and finds schedulable tasks through the scheduler, so the CPU utilization is not as high as DolphinScheduler. See AWS Performance Benchmarks for details

Demands and Challenges

Appeal

As a migration tool, its core appeal is to realize the migration of Airflow DAGs into workflows in DolphinScheduler with as little human intervention as possible.

However, this requires a good balance. We cannot blindly pursue automation, otherwise it may lead to program complexity, reduced maintainability, and weakened generalization capabilities. Especially when we need to be compatible with different Airflow versions, the choice is A problem air2phin has to face.

Challenge

  • Syntax differences: Airflow and DolphinScheduler Python SDK are the same in basic Python syntax (for, if, else), but there are slight differences in specific task names and parameters, such as in airflow The name of the bash operator corresponding to the DolphinScheduler Python SDK is Shell, and the parameters of the two are also different. The migration needs to be compatible with this part of the logic.
  • Task type differences: Airflow and DolphinScheduler may both allow users to extend a certain degree of customization, such as custom plug-ins. However, there are differences between the two in the number of task types and the encapsulation abstraction of tasks. Some task types only exist in airflow, and some task types only exist in DolphinScheduler. These differences need to be processed during conversion.
  • Differences in scheduled scheduling: Airflow uses Cron expressions (such as 5 4 * * *) or Python’s datetime.timedelta when defining the scheduling period, while DolphinScheduler uses more refined Cron expressions, such as
    0 5 4 * * ? * So this part of the conversion is also a challenge
  • Differences in built-in time parameters: Airflow’s built-in time parameters are processed through macro, and the jinja2 template is provided for time calculation, such as ds_add(‘2015-01-06’, -5) . DolphinScheduler has its own built-in time definition and calculation rules. For example, if the running time uses yyyy-MM-dd, if the time needs to increase or decrease, use yyyy-MM-dd + 1.
  • Extension of migration rules: Both Airflow and DolphinScheduler Python SDK will modify the corresponding APIs over time. Only incompatible modifications will cause the migration tool to become invalid, so the modification and new migration tool rules Addition needs to be as simple as possible to minimize maintenance costs
  • Different versions of Airflow: There may be differences between different versions of Airflow. For example, before 2.0.0, there was airflow.operators.bash_operator, but after 2.0.0, we only have airflow.operators.bash.

Introduction to Migration Tool

What is Air2phin

Air2phin is a rule-based AST converter that provides the function of converting Airflow dag files into pydolphinscheudler definition files. It uses LibCST to parse and transform Python code and uses Yaml files to define transformation rules. It is a tool to assist users in completing conversion, not a one-click conversion tool.

Air2phin’s data flow diagram

file

  • Get the definitions of original airflow DAGs from standard input or files
  • Load transformation rules from YAML file into air2phin
  • Parse the contents of airflow DAGs into CST trees
  • Alter CST tree according to transformation rules
  • Output the converted results to standard output or a file

Air2phin how to use

Since Air2phin is a Python package, it needs to be installed through pip. After the installation is completed, you can use the command air2phin migrate ~/airflow/dags to convert all the dags of airflow into the definition of DolphinScheduler Python SDK. At this step, the mission of air2phin has been completed. In the end, only You need to use Python to execute this part of the SDK code to submit the converted workflow to DolphinScheduler.

# Install package
python -m pip install --upgrade air2phin

#Migrate airflow’s dags
air2phin migrate -i ~/airflow/dags

In actual production, there may be a lot of DAGs under ~/airflow/dags, and air2phin processes these DAGs serially by default. If you want more efficient processing, you can use –multiprocess

Allow air2phin to perform conversion in multiple processes.

# use multiprocess to convert the airflow dags files
air2phin migrate -i --multiprocess 12 ~/airflow/dags 

After completing the above conversion, you have completed the conversion from the Airflow dags file to the DolphinScheduler python sdk definition script. You only need to submit the DolphinScheduler python sdk to DolphinSchedeuler to complete.

# Install apache-dolphinscheduler according to apache DolphinScheduler server you use, ref: https://dolphinscheduler.apache.org/python/main/#version
python -m pip install apache-dolphinscheduler
# Submit your dolphinscheduler python sdk definition
python ~/airflow/dags/tutorial.py

Air2phin how to define your own conversion rules

Most Airflow users customize some operators. To convert these operators, users need to define rules. Fortunately, Air2phin’s rules are based on YAML files, which means that users can easily add new rules. The following is a rule conversion YAML file that converts a user-defined Redshift operator into a DolphinScheduler SQL task type.

It is assumed here that the user has customized a redshift operator based on airflow.providers.postgres.operators.postgres. The operator code is as follows

from airflow.providers.postgres.operators.postgres import PostgresOperator

class RedshiftOperator(PostgresOperator):
    def __init__(
        self,
        *,
        sql: str | Iterable[str],
        my_custom_conn_id: str = 'postgres_default',
        autocommit: bool = False,
        parameters: Iterable | Mapping | None = None,
        database: str | None = None,
        runtime_parameters: Mapping | None = None,
        **kwargs,
    ) -> None:
        super().__init__(
            sql=sql,
            postgres_conn_id=my_custom_conn_id,
            autocommit=autocommit,
            parameters=parameters,
            database=database,
            runtime_parameters=runtime_parameters,
            **kwargs,
        )

Since this is a user-defined operator, it is definitely not in the built-in conversion rules of air2phin, so we need to customize a YAML file of conversion rules.

name: RedshiftOperator

migration:
  module:
    - action: replace
      src: utils.operator.RedshiftOperator.RedshiftOperator
      dest: pydolphinscheduler.tasks.sql.Sql
    - action: add
      module: pydolphinscheduler.resources_plugin.Local
  parameter:
    - action: replace
      src: task_id
      dest: name
    - action: add
      arg: datasource_name
      default:
        type: str
        value: "redshift_read_conn"
    - action: add
      arg: resource_plugin
      default:
        type: code
        value: Local(prefix="/path/to/dir/")

Customers only need to add this file to the rule path of air2phin to realize the conversion action of the custom operator.

air2phin migrate --custom-rules /path/to/RedshiftOperator.yaml ~/airflow/dags

How Air2phin solves migration challenges

We mentioned earlier the challenges that may be faced by Airflow to Dolophinscheduler. Let’s take a look at how Air2phin solves them.

Syntax Difference

Since both Airflow and DolphinScheduler Python SDK are written in Python. So the basic syntax related to Python is similar. However, since Airflow and DolphinScheduler Python SDK are two unrelated sets of APIs, there are some differences between the two in terms of specific parameters, classes, and functions. air2phin is used to solve this problem. It resolves the differences and implements the migration process from one platform to another by defining appropriate conversion rules for this part of YAML.

YAML file conversion rules:

  • Parameter mapping: For different naming or structures of parameters, mapping rules can be defined in the YAML file to map the parameter names of Airflow to the corresponding parameters of the DolphinScheduler Python SDK.
  • Class and function conversion: If Airflow and DolphinScheduler Python SDK use different class names or functions, you can define conversion rules for classes and functions in the YAML file to map Airflow class names and functions to DolphinScheduler Python SDK equivalent.
  • Error handling and alerting: Given that the two platforms may have different error handling and alerting mechanisms, how to map Airflow’s error handling to the equivalent mechanism of DolphinScheduler can be defined in the YAML file.

By formulating these conversion rules, you can ensure that during the migration process, Airflow’s task code is converted into the code required by the DolphinScheduler Python SDK platform according to the definition of the YAML file to adapt to the differences between platforms and ensure that new and modified tasks flexibility. Task type differences

Scheduling Difference

There are also some differences between Airflow and DolphinScheduler Python SDK in terms of scheduled scheduling configuration. Airflow uses standard Cron expressions to define task scheduling, while DolphinScheduler Python SDK adopts a more precise Cron scheduling strategy. This difference may affect the precise scheduling and execution frequency of tasks.

  • Airflow’s Cron expressions: Airflow uses general Cron expressions to define the scheduling frequency of tasks. A Cron expression consists of five or six fields, representing minutes, hours, dates, months, and days of the week. It allows users to define relatively loose scheduling rules, such as once an hour, once a day, etc.
  • Precise Cron scheduling of DolphinScheduler Python SDK: DolphinScheduler introduces a more precise Cron scheduling strategy. It splits Cron expressions into two parts: Basic Cron and Advanced Cron. Basic Cron is used to define rough scheduling rules for tasks, such as minutes, hours, dates, etc. Advanced Cron is used to define more precise scheduling rules, including second-level accuracy. This allows DolphinScheduler to implement more fine-grained task scheduling, which is suitable for scenarios with higher execution time requirements, such as the financial field.

Since the accuracy of DolphinScheduler Python SDK is higher than that of Airflow, there will be no problem of accuracy loss during conversion, and this problem will be easily solved.

Built-in time parameter difference

The built-in time parameter difference refers to the different ways in which Airflow and DolphinScheduler Python SDK use built-in time parameters in task scheduling. Airflow uses Jinja2’s macro function to implement built-in time parameters, while DolphinScheduler’s Python SDK uses a custom way to implement these parameters. These two implementation methods may lead to differences in usage and understanding.

  • Airflow’s Jinja2 macro: Airflow’s built-in time parameters are implemented through Jinja2 macros. Jinja2 macros allow the use of special placeholders and functions in DAGs files for dynamically generating scheduling times. For example, you can use {{ macros.ds_add(ds, 1) }} to add one day to the scheduling time.
  • Customized implementation of DolphinScheduler Python SDK: When implementing built-in time parameters, DolphinScheduler’s Python SDK may use some custom functions or classes instead of directly using Jinja2 macros. These custom implementation methods may require specific configuration and processing on the DolphinScheduler platform.

Therefore, you need to pay attention to the following when migrating:

  1. Different syntax and methods: The syntax and usage of Airflow’s Jinja2 macro are very different from the custom implementation of the DolphinScheduler Python SDK, which may cause some parameters to not be migrated correctly. Air2phin will retain its original values for some parameters that cannot be automatically migrated.

  2. Functional similarity: Although implemented differently, both aim to provide built-in time parameters for task scheduling. Ensure that migrated tasks correctly use the new platform’s built-in timing parameters.

Extension of migration rules

Airflow allows users to define and use custom Operators, Hooks, Sensors, etc. as needed to meet specific task requirements. These custom components may be used in DAGs, and the way they are implemented and called may require special handling during the migration process. The simplest way to deal with it is to use the method mentioned in the above question “How Air2phin defines its own conversion rules”. As long as the custom task can be defined in DolphinScheduler, the task can be migrated from Airflow to DolphinScheduler

Different versions of Airflow migration

Different versions of Airflow have different operator syntax. In versions before 2.0.0, Airflow only supports bash with the class airflow.operators.bash_operator.BashOperator. However, in versions 2.0.0 and later, Airflow More recommended for bash is the airflow.operators.bash.BashOperator class, which is also compatible with Airflow.operators.bash_operator.BashOperator. There are many similar situations, so Air2phin needs to be compatible with the shell task type converted from the above two types into DolphinScheduler. We implement the transformation of multiple classes by supporting lists in YAML. For details, see the migration.module.* nodes below.

migration:
  module:
    - action: replace
      src:
        - airflow.operators.bash.BashOperator
        - airflow.operators.bash_operator.BashOperator
      dest: pydolphinscheduler.tasks.shell.Shell
  parameter:
    - action: replace
      src: task_id
      dest: name
    - action: replace
      src: bash_command
      dest: command

User Benefits

The Air2phin migration tool allows users to convert Airflow’s DAGs code to DolphinScheduler Python SDK through simple configuration, bringing a lot of benefits to users.

  • Simplify the migration process: The migration tool can automatically handle code conversion, avoiding the complex process of manual line-by-line migration, greatly reducing the workload of developers.
  • Saving Time and Cost: Manually migrating code requires a significant investment of time and human resources. Use migration tools to complete your migration quickly and efficiently, saving time and money.
  • Reduce errors: Manual migration can easily introduce errors, while migration tools can automatically perform conversions based on predefined rules, reducing potential human errors.
  • Standardized code style: The migration tool can generate code based on predefined rules and templates to ensure consistent code style and reduce maintenance costs.
  • Lower technical threshold: The migration tool can hide the underlying technical details, allowing developers who are not familiar with DolphinScheduler to easily migrate tasks.
  • Flexibility and customizability: Good migration tools will usually offer some customizable options to meet the needs of different projects while maintaining flexibility.

Overall, using Air2phin can significantly improve the efficiency and quality of the migration process, reduce risks, and at the same time reduce the workload of developers, bringing time and resource savings to the team, as well as a better development experience.

Problems that Air2phin cannot yet resolve

Air2phin is a tool that helps users migrate from Airflow to Apache DolphinScheduler more easily. The key word for this is “assistance”, which means that it can reduce the user’s migration cost, but it cannot be fully automated. The currently known unsolvable problems are as follows:

  • Task types that do not exist in DolphinScheduler cannot be migrated: Some task types only exist in Airflow, but do not exist in DolphinScheduler. These tasks cannot be automatically migrated and need to be processed manually. For example, Discord operator does not exist in DolphinScheduler, so the original Discord operator definition will be retained and needs to be processed manually by the user.
  • Some task attributes cannot be migrated to DolphinScheduler: Some task attributes in Airflow do not exist in DolphinScheduler, such as successc_callback and retry_callback. These attributes will be directly abandoned during the migration process.

REF

  • air2phin usage documentation: https://air2phin.readthedocs.io/en/latest/index.html
  • air2phin in PyPI: https://pypi.org/project/air2phin/
  • air2phin GitHub repository: https://github.com/WhaleOps/air2phin

    This article is published by Beluga Open Source Technology!