2025 年 Airflow Summit 即将于 10 月 07-09 日举行。立即注册获取早鸟票!

指标配置

Airflow 可以配置为将指标发送到 StatsDOpenTelemetry

设置 - StatsD

要使用 StatsD,您必须先安装所需的软件包

pip install 'apache-airflow[statsd]'

然后将以下行添加到您的配置文件中,例如 airflow.cfg

[metrics]
statsd_on = True
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow

如果您想使用自定义 StatsD 客户端而不是 Airflow 提供的默认客户端,则必须将以下键及其自定义 StatsD 客户端的模块路径添加到配置文件中。此模块必须在您的 PYTHONPATH 中可用。

[metrics]
statsd_custom_client_path = x.y.customclient

有关 Python 和 Airflow 如何管理模块的详细信息,请参阅模块管理

设置 - OpenTelemetry

要使用 OpenTelemetry,您必须先安装所需的软件包

pip install 'apache-airflow[otel]'

将以下行添加到您的配置文件中,例如 airflow.cfg

[metrics]
otel_on = True
otel_host = localhost
otel_port = 8889
otel_prefix = airflow
otel_interval_milliseconds = 30000  # The interval between exports, defaults to 60000
otel_ssl_active = False

启用 Https

要与 OpenTelemetry collector 建立 HTTPS 连接,您需要在 OpenTelemetry collector 的 config.yml 文件中配置 SSL 证书和密钥。

receivers:
  otlp:
    protocols:
      http:
        endpoint: 0.0.0.0:4318
        tls:
          cert_file: "/path/to/cert/cert.crt"
          key_file: "/path/to/key/key.pem"

允许/阻止列表

如果您想避免发送所有可用指标,您可以配置允许列表或阻止列表,以便只发送或阻止以列表元素开头的指标

[metrics]
metrics_allow_list = scheduler,executor,dagrun,pool,triggerer,celery
[metrics]
metrics_block_list = scheduler,executor,dagrun,pool,triggerer,celery

重命名指标

如果您想将指标重定向到不同的名称,您可以在 [metrics] 部分配置 stat_name_handler 选项。它应该指向一个函数,该函数验证统计名称,必要时应用更改,并返回转换后的统计名称。该函数可能如下所示

def my_custom_stat_name_handler(stat_name: str) -> str:
    return stat_name.lower()[:32]

其他配置选项

注意

有关指标配置选项的详细列表,请参阅配置参考文档 - [metrics]

指标描述

计数器

名称

描述

<job_name>_start

已启动的 <job_name> 作业数量,例如 SchedulerJob, LocalTaskJob

<job_name>_end

已结束的 <job_name> 作业数量,例如 SchedulerJob, LocalTaskJob

<job_name>_heartbeat_failure

<job_name> 作业心跳失败次数,例如 SchedulerJob, LocalTaskJob

local_task_job.task_exit.<job_id>.<dag_id>.<task_id>.<return_code>

LocalTaskJob 在运行 DAG <dag_id> 的任务 <task_id> 时,以 <return_code> 终止的次数。

local_task_job.task_exit

LocalTaskJob 在运行 DAG <dag_id> 的任务 <task_id> 时,以 <return_code> 终止的次数。带有 job_id、dag_id、task_id 和 return_code 标签的指标。

operator_failures_<operator_name>

Operator <operator_name> 失败次数

operator_failures

Operator <operator_name> 失败次数。带有 operator_name 标签的指标。

operator_successes_<operator_name>

Operator <operator_name> 成功次数

operator_successes

Operator <operator_name> 成功次数。带有 operator_name 标签的指标。

ti_failures

总体任务实例失败次数。带有 dag_id 和 task_id 标签的指标。

ti_successes

总体任务实例成功次数。带有 dag_id 和 task_id 标签的指标。

previously_succeeded

先前成功完成的任务实例数量。带有 dag_id 和 task_id 标签的指标。

task_instances_without_heartbeats_killed

没有心跳的任务实例被终止。带有 dag_id 和 task_id 标签的指标。

scheduler_heartbeat

调度器心跳

dag_processor_heartbeat

独立 DAG 处理器心跳

dag_processing.processes

当前正在运行的 DAG 解析进程的相对数量(即自从上次发送指标以来,进程完成时,此增量为负)。带有 file_path 和 action 标签的指标。

dag_processing.processor_timeouts

由于耗时过长而被终止的文件处理器数量。带有 file_path 标签的指标。

dag_processing.other_callback_count

接收到的非 SLA 回调数量

dag_processing.file_path_queue_update_count

扫描文件系统并将所有现有 dag 加入队列的次数

dag_file_processor_timeouts

(已弃用) 与 dag_processing.processor_timeouts 行为相同

dag_processing.manager_stalls

DagFileProcessorManager 停滞次数

dag_file_refresh_error

加载任何 DAG 文件失败次数

scheduler.tasks.killed_externally

外部终止的任务数量。带有 dag_id 和 task_id 标签的指标。

scheduler.orphaned_tasks.cleared

调度器清除的孤立任务数量

scheduler.orphaned_tasks.adopted

调度器接管的孤立任务数量

scheduler.critical_section_busy

调度器进程尝试锁定关键部分(将任务发送到执行器所需)并发现已被其他进程锁定的次数。

ti.start.<dag_id>.<task_id>

给定 dag 中已启动的任务数量。类似于 <job_name>_start 但用于任务

ti.start

给定 dag 中已启动的任务数量。类似于 <job_name>_start 但用于任务。带有 dag_id 和 task_id 标签的指标。

ti.finish.<dag_id>.<task_id>.<state>

给定 dag 中已完成的任务数量。类似于 <job_name>_end 但用于任务

ti.finish

给定 dag 中已完成的任务数量。类似于 <job_name>_end 但用于任务。带有 dag_id 和 task_id 标签的指标。

dag.callback_exceptions

DAG 回调抛出的异常数量。发生这种情况时,表示 DAG 回调不起作用。带有 dag_id 标签的指标

celery.task_timeout_error

将任务发布到 Celery Broker 时抛出的 AirflowTaskTimeout 错误数量。

celery.execute_command.failure

Celery 任务的非零退出码数量。

task_removed_from_dag.<dag_id>

给定 dag 中已删除的任务数量(即任务不再存在于 DAG 中)。

task_removed_from_dag

给定 dag 中已删除的任务数量(即任务不再存在于 DAG 中)。带有 dag_id 和 run_type 标签的指标。

task_restored_to_dag.<dag_id>

给定 dag 中已恢复的任务数量(即先前在数据库中处于 REMOVED 状态的任务实例已添加到 DAG 文件中)

task_restored_to_dag.<dag_id>

给定 dag 中已恢复的任务数量(即先前在数据库中处于 REMOVED 状态的任务实例已添加到 DAG 文件中)。带有 dag_id 和 run_type 标签的指标。

task_instance_created_<operator_name>

给定 Operator 创建的任务实例数量

task_instance_created

给定 Operator 创建的任务实例数量。带有 dag_id 和 run_type 标签的指标。

triggerer_heartbeat

触发器心跳

triggers.blocked_main_thread

阻塞主线程的触发器数量(可能是由于不是完全异步)

triggers.failed

在触发事件之前出错的触发器数量

triggers.succeeded

至少触发了一个事件的触发器数量

asset.updates

已更新资产数量

asset.orphaned

标记为孤立资产的数量,因为它们不再在 DAG 调度参数或任务输出中被引用

asset.triggered_dagruns

由资产更新触发的 DAG 运行数量

计量器

名称

描述

dagbag_size

调度器根据其配置执行扫描时找到的 dag 数量

dag_processing.import_errors

尝试解析 DAG 文件时发生的错误数量

dag_processing.total_parse_time

扫描和导入 dag_processing.file_path_queue_size 个 DAG 文件所花费的秒数

dag_processing.file_path_queue_size

下次扫描要考虑的 DAG 文件数量

dag_processing.last_run.seconds_ago.<dag_file>

自上次处理 <dag_file> 以来的秒数

dag_processing.last_num_of_db_queries.<dag_file>

解析每个 <dag_file> 期间对 Airflow 数据库的查询次数

scheduler.tasks.starving

由于资源池中没有可用槽位而无法调度的任务数量

scheduler.tasks.executable

根据资源池限制、DAG 并发、执行器状态和优先级,已准备好执行(设置为 queued)的任务数量。

executor.open_slots.<executor_class_name>

特定执行器上的可用槽位数量。仅在配置了多个执行器时发出。

executor.open_slots

执行器上的可用槽位数量

executor.queued_tasks.<executor_class_name>

特定执行器上的排队任务数量。仅在配置了多个执行器时发出。

executor.queued_tasks

执行器上的排队任务数量

executor.running_tasks.<executor_class_name>

特定执行器上正在运行的任务数量。仅在配置了多个执行器时发出。

executor.running_tasks

执行器上正在运行的任务数量

pool.open_slots.<pool_name>

资源池中的可用槽位数量

pool.open_slots

资源池中的可用槽位数量。带有 pool_name 标签的指标。

pool.queued_slots.<pool_name>

资源池中的排队槽位数量

pool.queued_slots

资源池中的排队槽位数量。带有 pool_name 标签的指标。

pool.running_slots.<pool_name>

资源池中正在运行的槽位数量

pool.running_slots

资源池中正在运行的槽位数量。带有 pool_name 标签的指标。

pool.deferred_slots.<pool_name>

资源池中已延迟的槽位数量

pool.deferred_slots

资源池中已延迟的槽位数量。带有 pool_name 标签的指标。

pool.scheduled_slots.<pool_name>

资源池中已调度的槽位数量

pool.scheduled_slots

资源池中已调度的槽位数量。带有 pool_name 标签的指标。

pool.starving_tasks.<pool_name>

资源池中饥饿任务的数量

pool.starving_tasks

资源池中饥饿任务的数量。带有 pool_name 标签的指标。

task.cpu_usage.<dag_id>.<task_id>

任务使用的 CPU 百分比

task.mem_usage.<dag_id>.<task_id>

任务使用的内存百分比

triggers.running.<hostname>

正在为某个触发器运行的触发器数量(由 hostname 描述)

triggers.running

正在为某个触发器运行的触发器数量(由 hostname 描述)。带有 hostname 标签的指标。

triggerer.capacity_left.<hostname>

触发器运行触发器的剩余容量(由 hostname 描述)

triggerer.capacity_left

触发器运行触发器的剩余容量(由 hostname 描述)。带有 hostname 标签的指标。

ti.running.<queue>.<dag_id>.<task_id>

给定 dag 中正在运行的任务数量。由于 ti.start 和 ti.finish 可能不同步,此指标显示所有正在运行的 ti。

ti.running

给定 dag 中正在运行的任务数量。由于 ti.start 和 ti.finish 可能不同步,此指标显示所有正在运行的 ti。带有 queue、dag_id 和 task_id 标签的指标。

计时器

名称

描述

dagrun.dependency-check.<dag_id>

检查 DAG 依赖项所花费的毫秒数

dagrun.dependency-check

检查 DAG 依赖项所花费的毫秒数。带有 dag_id 标签的指标。

dag.<dag_id>.<task_id>.duration

运行任务所花费的毫秒数

task.duration

运行任务所花费的毫秒数。带有 dag_id 和 task-id 标签的指标。

dag.<dag_id>.<task_id>.scheduled_duration

任务在 Scheduled 状态花费的毫秒数,然后进入 Queued 状态

task.scheduled_duration

任务在 Scheduled 状态花费的毫秒数,然后进入 Queued 状态。带有 dag_id 和 task_id 标签的指标。

dag.<dag_id>.<task_id>.queued_duration

任务在 Queued 状态花费的毫秒数,然后进入 Running 状态

task.queued_duration

任务在 Queued 状态花费的毫秒数,然后进入 Running 状态。带有 dag_id 和 task_id 标签的指标。

dag_processing.last_duration.<dag_file>

加载给定 DAG 文件所花费的毫秒数

dag_processing.last_duration

加载给定 DAG 文件所花费的毫秒数。带有 file_name 标签的指标。

dagrun.duration.success.<dag_id>

DagRun 达到成功状态所花费的毫秒数

dagrun.duration.success

DagRun 达到成功状态所花费的毫秒数。带有 dag_id 和 run_type 标签的指标。

dagrun.duration.failed.<dag_id>

DagRun 达到失败状态所花费的毫秒数

dagrun.duration.failed

DagRun 达到失败状态所花费的毫秒数。带有 dag_id 和 run_type 标签的指标。

dagrun.schedule_delay.<dag_id>

计划的 DagRun 开始日期与实际 DagRun 开始日期之间的延迟毫秒数

dagrun.schedule_delay

计划的 DagRun 开始日期与实际 DagRun 开始日期之间的延迟毫秒数。带有 dag_id 标签的指标。

scheduler.critical_section_duration

调度器循环关键部分花费的毫秒数 – 每次只有一个调度器可以进入此循环

scheduler.critical_section_query_duration

运行关键部分任务实例查询花费的毫秒数

scheduler.scheduler_loop_duration

运行一次调度器循环花费的毫秒数

dagrun.<dag_id>.first_task_scheduling_delay

第一个任务 start_date 与 dagrun 预期开始时间之间的毫秒数

dagrun.first_task_scheduling_delay

第一个任务 start_date 与 dagrun 预期开始时间之间的毫秒数。带有 dag_id 和 run_type 标签的指标。

collect_db_dags

从数据库中获取所有序列化 Dag 所花费的毫秒数

kubernetes_executor.clear_not_launched_queued_tasks.duration

在 Kubernetes Executor 中清除未启动的排队任务所花费的毫秒数

kubernetes_executor.adopt_task_instances.duration

在 Kubernetes Executor 中接管任务实例所花费的毫秒数

此条目有帮助吗?