Airfolw integrates Grafana (prometheus)
Use grafana to monitor airflow indicator information and implement alarm prompts~
Environment preparation
Airflow, grafana, prometheus, and consul have been deployed in advance
(I installed consul myself. In order to avoid adding a new server, I have to restart prometheus)
Installation steps
1. Configure airflow.cfg
[metrics] statsd_on = True statsd_host = localhost statsd_port = 9125 statsd_prefix = airflow statsd_allow_list =
- Expose airflow’s 9125 port to statsd
- statsd_allow_list: The information can be configured according to your own ideas, generally scheduler, executor, dagrun
- You can view it in detail in the https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/metrics.html document
2. statsd is started here using docker, or you can download it yourself from github
# Create a file named docker-compose.yml version: '2.1' services: webserver: image: prom/statsd-exporter:v0.22.3 restart: always volumes: # /data/statsd_export/statsd_mapping.yml maps local files to docker - /data/statsd_export/statsd_mapping.yml:/tmp/statsd_mapping.yml ports: - 9102:9102 - 9125:9125 - 9125:9125/udp command: "--statsd.mapping-config=/tmp/statsd_mapping.yml"
The following is the configuration information of statsd_mapping.yml
According to the configuration information address copied directly from github
mappings: # Airflow StatsD metrics mappings (https://airflow.apache.org/docs/apache-airflow/stable/logging-monitoring/metrics.html) # === Counters === - match: "(. + )\.(. + )_start$" match_metric_type: counter name: "af_agg_job_start" match_type: regex labels: airflow_id: "$1" job_name: "$2" - match: "(. + )\.(. + )_end$" match_metric_type: counter name: "af_agg_job_end" match_type: regex labels: airflow_id: "$1" job_name: "$2" - match: "(. + )\.operator_failures_(. + )$" match_metric_type: counter name: "af_agg_operator_failures" match_type: regex labels: airflow_id: "$1" operator_name: "$2" - match: "(. + )\.operator_successes_(. + )$" match_metric_type: counter name: "af_agg_operator_successes" match_type: regex labels: airflow_id: "$1" operator_name: "$2" - match: "*.ti_failures" match_metric_type: counter name: "af_agg_ti_failures" labels: airflow_id: "$1" - match: "*.ti_successes" match_metric_type: counter name: "af_agg_ti_successes" labels: airflow_id: "$1" - match: "*.zombies_killed" match_metric_type: counter name: "af_agg_zombies_killed" labels: airflow_id: "$1" - match: "*.scheduler_heartbeat" match_metric_type: counter name: "af_agg_scheduler_heartbeat" labels: airflow_id: "$1" - match: "*.dag_processing.processes" match_metric_type: counter name: "af_agg_dag_processing_processes" labels: airflow_id: "$1" - match: "*.scheduler.tasks.killed_externally" match_metric_type: counter name: "af_agg_scheduler_tasks_killed_externally" labels: airflow_id: "$1" - match: "*.scheduler.tasks.running" match_metric_type: counter name: "af_agg_scheduler_tasks_running" labels: airflow_id: "$1" - match: "*.scheduler.tasks.starving" match_metric_type: counter name: "af_agg_scheduler_tasks_starving" labels: airflow_id: "$1" - match: "*.scheduler.orphaned_tasks.cleared" match_metric_type: counter name: "af_agg_scheduler_orphaned_tasks_cleared" labels: airflow_id: "$1" - match: "*.scheduler.orphaned_tasks.adopted" match_metric_type: counter name: "af_agg_scheduler_orphaned_tasks_adopted" labels: airflow_id: "$1" - match: "*.scheduler.critical_section_busy" match_metric_type: counter name: "af_agg_scheduler_critical_section_busy" labels: airflow_id: "$1" - match: "*.sla_email_notification_failure" match_metric_type: counter name: "af_agg_sla_email_notification_failure" labels: airflow_id: "$1" - match: "*.ti.start.*.*" match_metric_type: counter name: "af_agg_ti_start" labels: airflow_id: "$1" dag_id: "$2" task_id: "$3" - match: "*.ti.finish.*.*.*" match_metric_type: counter name: "af_agg_ti_finish" labels: airflow_id: "$1" dag_id: "$2" task_id: "$3" state: "$4" - match: "*.dag.callback_exceptions" match_metric_type: counter name: "af_agg_dag_callback_exceptions" labels: airflow_id: "$1" - match: "*.celery.task_timeout_error" match_metric_type: counter name: "af_agg_celery_task_timeout_error" labels: airflow_id: "$1" # === Gauges === - match: "*.dagbag_size" match_metric_type: gauge name: "af_agg_dagbag_size" labels: airflow_id: "$1" - match: "*.dag_processing.import_errors" match_metric_type: gauge name: "af_agg_dag_processing_import_errors" labels: airflow_id: "$1" - match: "*.dag_processing.total_parse_time" match_metric_type: gauge name: "af_agg_dag_processing_total_parse_time" labels: airflow_id: "$1" - match: "*.dag_processing.last_runtime.*" match_metric_type: gauge name: "af_agg_dag_processing_last_runtime" labels: airflow_id: "$1" dag_file: "$2" - match: "*.dag_processing.last_run.seconds_ago.*" match_metric_type: gauge name: "af_agg_dag_processing_last_run_seconds" labels: airflow_id: "$1" dag_file: "$2" - match: "*.dag_processing.processor_timeouts" match_metric_type: gauge name: "af_agg_dag_processing_processor_timeouts" labels: airflow_id: "$1" - match: "*.executor.open_slots" match_metric_type: gauge name: "af_agg_executor_open_slots" labels: airflow_id: "$1" - match: "*.executor.queued_tasks" match_metric_type: gauge name: "af_agg_executor_queued_tasks" labels: airflow_id: "$1" - match: "*.executor.running_tasks" match_metric_type: gauge name: "af_agg_executor_running_tasks" labels: airflow_id: "$1" - match: "*.pool.open_slots.*" match_metric_type: gauge name: "af_agg_pool_open_slots" labels: airflow_id: "$1" pool_name: "$2" - match: "*.pool.queued_slots.*" match_metric_type: gauge name: "af_agg_pool_queued_slots" labels: airflow_id: "$1" pool_name: "$2" - match: "*.pool.running_slots.*" match_metric_type: gauge name: "af_agg_pool_running_slots" labels: airflow_id: "$1" pool_name: "$2" - match: "*.pool.starving_tasks.*" match_metric_type: gauge name: "af_agg_pool_starving_tasks" labels: airflow_id: "$1" pool_name: "$2" - match: "*.smart_sensor_operator.poked_tasks" match_metric_type: gauge name: "af_agg_smart_sensor_operator_poked_tasks" labels: airflow_id: "$1" - match: "*.smart_sensor_operator.poked_success" match_metric_type: gauge name: "af_agg_smart_sensor_operator_poked_success" labels: airflow_id: "$1" - match: "*.smart_sensor_operator.poked_exception" match_metric_type: gauge name: "af_agg_smart_sensor_operator_poked_exception" labels: airflow_id: "$1" - match: "*.smart_sensor_operator.exception_failures" match_metric_type: gauge name: "af_agg_smart_sensor_operator_exception_failures" labels: airflow_id: "$1" - match: "*.smart_sensor_operator.infra_failures" match_metric_type: gauge name: "af_agg_smart_sensor_operator_infra_failures" labels: airflow_id: "$1" # === Timers === - match: "*.dagrun.dependency-check.*" match_metric_type: observer name: "af_agg_dagrun_dependency_check" labels: airflow_id: "$1" dag_id: "$2" - match: "*.dag.*.*.duration" match_metric_type: observer name: "af_agg_dag_task_duration" labels: airflow_id: "$1" dag_id: "$2" task_id: "$3" - match: "*.dag_processing.last_duration.*" match_metric_type: observer name: "af_agg_dag_processing_duration" labels: airflow_id: "$1" dag_file: "$2" - match: "*.dagrun.duration.success.*" match_metric_type: observer name: "af_agg_dagrun_duration_success" labels: airflow_id: "$1" dag_id: "$2" - match: "*.dagrun.duration.failed.*" match_metric_type: observer name: "af_agg_dagrun_duration_failed" labels: airflow_id: "$1" dag_id: "$2" - match: "*.dagrun.schedule_delay.*" match_metric_type: observer name: "af_agg_dagrun_schedule_delay" labels: airflow_id: "$1" dag_id: "$2" - match: "*.scheduler.critical_section_duration" match_metric_type: observer name: "af_agg_scheduler_critical_section_duration" labels: airflow_id: "$1" - match: "*.dagrun.*.first_task_scheduling_delay" match_metric_type: observer name: "af_agg_dagrun_first_task_scheduling_delay" labels: airflow_id: "$1" dag_id: "$2"
- Start command
# I won’t go into details about the docker configuration file above. docker-compose up -d
3. After the configuration is completed and docker is started, airflow will also be restarted
Use ip:91002/metrics to check. If the following information appears, it means the configuration is correct.
4. I use consul for unified management, so my command is
curl -X PUT -d '{"id": "airflow","name": "airflow","address": "xxx.xxx.xxx. xxx","port": 9102,"tags": ["test"],"checks": [{"http": "http://xxx.xxx. xxx.xxx:9102/metrics","interval": "15s"}]}' https://own consul address/v1/agent/service/register
If consul is not applicable, use prometheus configuration directly.
# I have not used it here and have not actually tested it. It is roughly like this. Adjust it appropriately according to your own situation. - job_name: airflow static_configs: # Fill in the address with the statsd address configured in airflow.cfg - targets: ['xxx.xxx.xxx.xxx:9102']
5. Configure grafana’s dashboard
airflow dashboard address
The sharing is complete here, inspired by:
https://blog.csdn.net/qq_42586468/article/details/131530803
This blogger, however, took many detours according to this blogger’s work, and finally optimized it a little bit based on his work.