Airfolw integrates Grafana (prometheus)

Airfolw integrates Grafana (prometheus)

Use grafana to monitor airflow indicator information and implement alarm prompts~

Environment preparation

Airflow, grafana, prometheus, and consul have been deployed in advance
(I installed consul myself. In order to avoid adding a new server, I have to restart prometheus)

Installation steps

1. Configure airflow.cfg

[metrics]
statsd_on = True
statsd_host = localhost
statsd_port = 9125
statsd_prefix = airflow
statsd_allow_list =
  • Expose airflow’s 9125 port to statsd
  • statsd_allow_list: The information can be configured according to your own ideas, generally scheduler, executor, dagrun
  • You can view it in detail in the https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/metrics.html document

2. statsd is started here using docker, or you can download it yourself from github

# Create a file named docker-compose.yml
version: '2.1'
services:
   webserver:
       image: prom/statsd-exporter:v0.22.3
       restart: always
       volumes:
           # /data/statsd_export/statsd_mapping.yml maps local files to docker
           - /data/statsd_export/statsd_mapping.yml:/tmp/statsd_mapping.yml
       ports:
           - 9102:9102
           - 9125:9125
           - 9125:9125/udp
       command: "--statsd.mapping-config=/tmp/statsd_mapping.yml"

The following is the configuration information of statsd_mapping.yml
According to the configuration information address copied directly from github

mappings:
  # Airflow StatsD metrics mappings (https://airflow.apache.org/docs/apache-airflow/stable/logging-monitoring/metrics.html)
  # === Counters ===
  - match: "(. + )\.(. + )_start$"
    match_metric_type: counter
    name: "af_agg_job_start"
    match_type: regex
    labels:
      airflow_id: "$1"
      job_name: "$2"
  - match: "(. + )\.(. + )_end$"
    match_metric_type: counter
    name: "af_agg_job_end"
    match_type: regex
    labels:
      airflow_id: "$1"
      job_name: "$2"
  - match: "(. + )\.operator_failures_(. + )$"
    match_metric_type: counter
    name: "af_agg_operator_failures"
    match_type: regex
    labels:
      airflow_id: "$1"
      operator_name: "$2"
  - match: "(. + )\.operator_successes_(. + )$"
    match_metric_type: counter
    name: "af_agg_operator_successes"
    match_type: regex
    labels:
      airflow_id: "$1"
      operator_name: "$2"
  - match: "*.ti_failures"
    match_metric_type: counter
    name: "af_agg_ti_failures"
    labels:
      airflow_id: "$1"
  - match: "*.ti_successes"
    match_metric_type: counter
    name: "af_agg_ti_successes"
    labels:
      airflow_id: "$1"
  - match: "*.zombies_killed"
    match_metric_type: counter
    name: "af_agg_zombies_killed"
    labels:
      airflow_id: "$1"
  - match: "*.scheduler_heartbeat"
    match_metric_type: counter
    name: "af_agg_scheduler_heartbeat"
    labels:
      airflow_id: "$1"
  - match: "*.dag_processing.processes"
    match_metric_type: counter
    name: "af_agg_dag_processing_processes"
    labels:
      airflow_id: "$1"
  - match: "*.scheduler.tasks.killed_externally"
    match_metric_type: counter
    name: "af_agg_scheduler_tasks_killed_externally"
    labels:
      airflow_id: "$1"
  - match: "*.scheduler.tasks.running"
    match_metric_type: counter
    name: "af_agg_scheduler_tasks_running"
    labels:
      airflow_id: "$1"
  - match: "*.scheduler.tasks.starving"
    match_metric_type: counter
    name: "af_agg_scheduler_tasks_starving"
    labels:
      airflow_id: "$1"
  - match: "*.scheduler.orphaned_tasks.cleared"
    match_metric_type: counter
    name: "af_agg_scheduler_orphaned_tasks_cleared"
    labels:
      airflow_id: "$1"
  - match: "*.scheduler.orphaned_tasks.adopted"
    match_metric_type: counter
    name: "af_agg_scheduler_orphaned_tasks_adopted"
    labels:
      airflow_id: "$1"
  - match: "*.scheduler.critical_section_busy"
    match_metric_type: counter
    name: "af_agg_scheduler_critical_section_busy"
    labels:
      airflow_id: "$1"
  - match: "*.sla_email_notification_failure"
    match_metric_type: counter
    name: "af_agg_sla_email_notification_failure"
    labels:
      airflow_id: "$1"
  - match: "*.ti.start.*.*"
    match_metric_type: counter
    name: "af_agg_ti_start"
    labels:
      airflow_id: "$1"
      dag_id: "$2"
      task_id: "$3"
  - match: "*.ti.finish.*.*.*"
    match_metric_type: counter
    name: "af_agg_ti_finish"
    labels:
      airflow_id: "$1"
      dag_id: "$2"
      task_id: "$3"
      state: "$4"
  - match: "*.dag.callback_exceptions"
    match_metric_type: counter
    name: "af_agg_dag_callback_exceptions"
    labels:
      airflow_id: "$1"
  - match: "*.celery.task_timeout_error"
    match_metric_type: counter
    name: "af_agg_celery_task_timeout_error"
    labels:
      airflow_id: "$1"

  # === Gauges ===
  - match: "*.dagbag_size"
    match_metric_type: gauge
    name: "af_agg_dagbag_size"
    labels:
      airflow_id: "$1"
  - match: "*.dag_processing.import_errors"
    match_metric_type: gauge
    name: "af_agg_dag_processing_import_errors"
    labels:
      airflow_id: "$1"
  - match: "*.dag_processing.total_parse_time"
    match_metric_type: gauge
    name: "af_agg_dag_processing_total_parse_time"
    labels:
      airflow_id: "$1"
  - match: "*.dag_processing.last_runtime.*"
    match_metric_type: gauge
    name: "af_agg_dag_processing_last_runtime"
    labels:
      airflow_id: "$1"
      dag_file: "$2"
  - match: "*.dag_processing.last_run.seconds_ago.*"
    match_metric_type: gauge
    name: "af_agg_dag_processing_last_run_seconds"
    labels:
      airflow_id: "$1"
      dag_file: "$2"
  - match: "*.dag_processing.processor_timeouts"
    match_metric_type: gauge
    name: "af_agg_dag_processing_processor_timeouts"
    labels:
      airflow_id: "$1"
  - match: "*.executor.open_slots"
    match_metric_type: gauge
    name: "af_agg_executor_open_slots"
    labels:
      airflow_id: "$1"
  - match: "*.executor.queued_tasks"
    match_metric_type: gauge
    name: "af_agg_executor_queued_tasks"
    labels:
      airflow_id: "$1"
  - match: "*.executor.running_tasks"
    match_metric_type: gauge
    name: "af_agg_executor_running_tasks"
    labels:
      airflow_id: "$1"
  - match: "*.pool.open_slots.*"
    match_metric_type: gauge
    name: "af_agg_pool_open_slots"
    labels:
      airflow_id: "$1"
      pool_name: "$2"
  - match: "*.pool.queued_slots.*"
    match_metric_type: gauge
    name: "af_agg_pool_queued_slots"
    labels:
      airflow_id: "$1"
      pool_name: "$2"
  - match: "*.pool.running_slots.*"
    match_metric_type: gauge
    name: "af_agg_pool_running_slots"
    labels:
      airflow_id: "$1"
      pool_name: "$2"
  - match: "*.pool.starving_tasks.*"
    match_metric_type: gauge
    name: "af_agg_pool_starving_tasks"
    labels:
      airflow_id: "$1"
      pool_name: "$2"
  - match: "*.smart_sensor_operator.poked_tasks"
    match_metric_type: gauge
    name: "af_agg_smart_sensor_operator_poked_tasks"
    labels:
      airflow_id: "$1"
  - match: "*.smart_sensor_operator.poked_success"
    match_metric_type: gauge
    name: "af_agg_smart_sensor_operator_poked_success"
    labels:
      airflow_id: "$1"
  - match: "*.smart_sensor_operator.poked_exception"
    match_metric_type: gauge
    name: "af_agg_smart_sensor_operator_poked_exception"
    labels:
      airflow_id: "$1"
  - match: "*.smart_sensor_operator.exception_failures"
    match_metric_type: gauge
    name: "af_agg_smart_sensor_operator_exception_failures"
    labels:
      airflow_id: "$1"
  - match: "*.smart_sensor_operator.infra_failures"
    match_metric_type: gauge
    name: "af_agg_smart_sensor_operator_infra_failures"
    labels:
      airflow_id: "$1"

  # === Timers ===
  - match: "*.dagrun.dependency-check.*"
    match_metric_type: observer
    name: "af_agg_dagrun_dependency_check"
    labels:
      airflow_id: "$1"
      dag_id: "$2"
  - match: "*.dag.*.*.duration"
    match_metric_type: observer
    name: "af_agg_dag_task_duration"
    labels:
      airflow_id: "$1"
      dag_id: "$2"
      task_id: "$3"
  - match: "*.dag_processing.last_duration.*"
    match_metric_type: observer
    name: "af_agg_dag_processing_duration"
    labels:
      airflow_id: "$1"
      dag_file: "$2"
  - match: "*.dagrun.duration.success.*"
    match_metric_type: observer
    name: "af_agg_dagrun_duration_success"
    labels:
      airflow_id: "$1"
      dag_id: "$2"
  - match: "*.dagrun.duration.failed.*"
    match_metric_type: observer
    name: "af_agg_dagrun_duration_failed"
    labels:
      airflow_id: "$1"
      dag_id: "$2"
  - match: "*.dagrun.schedule_delay.*"
    match_metric_type: observer
    name: "af_agg_dagrun_schedule_delay"
    labels:
      airflow_id: "$1"
      dag_id: "$2"
  - match: "*.scheduler.critical_section_duration"
    match_metric_type: observer
    name: "af_agg_scheduler_critical_section_duration"
    labels:
      airflow_id: "$1"
  - match: "*.dagrun.*.first_task_scheduling_delay"
    match_metric_type: observer
    name: "af_agg_dagrun_first_task_scheduling_delay"
    labels:
      airflow_id: "$1"
      dag_id: "$2"
  • Start command
# I won’t go into details about the docker configuration file above.
docker-compose up -d

3. After the configuration is completed and docker is started, airflow will also be restarted

Use ip:91002/metrics to check. If the following information appears, it means the configuration is correct.
Picture

4. I use consul for unified management, so my command is

curl -X PUT -d '{"id": "airflow","name": "airflow","address": "xxx.xxx.xxx. xxx","port": 9102,"tags": ["test"],"checks": [{"http": "http://xxx.xxx. xxx.xxx:9102/metrics","interval": "15s"}]}' https://own consul address/v1/agent/service/register

If consul is not applicable, use prometheus configuration directly.

# I have not used it here and have not actually tested it. It is roughly like this. Adjust it appropriately according to your own situation.
- job_name: airflow
    static_configs:
    # Fill in the address with the statsd address configured in airflow.cfg
      - targets: ['xxx.xxx.xxx.xxx:9102']

5. Configure grafana’s dashboard

airflow dashboard address
import


The sharing is complete here, inspired by:
https://blog.csdn.net/qq_42586468/article/details/131530803
This blogger, however, took many detours according to this blogger’s work, and finally optimized it a little bit based on his work.