指标配置

Airflow 可以配置为将指标发送到 StatsDOpenTelemetry

设置 - StatsD

要使用 StatsD,您必须先安装所需的包

pip install 'apache-airflow[statsd]'

然后将以下行添加到您的配置文件,例如 airflow.cfg

[metrics]
statsd_on = True
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow

如果您想使用自定义的 StatsD 客户端,而不是 Airflow 提供的默认客户端,需要在配置文件中添加以下键,并同时提供自定义 StatsD 客户端的模块路径。该模块必须在您的 PYTHONPATH 中可用。

[metrics]
statsd_custom_client_path = x.y.customclient

请参阅 模块管理,了解 Python 和 Airflow 如何管理模块的详细信息。

设置 - OpenTelemetry

要使用 OpenTelemetry,您必须先安装所需的包

pip install 'apache-airflow[otel]'

需要一个 OpenTelemetry Collector(或兼容服务)来连接到指标后端。将 Collector 的详细信息添加到您的配置文件,例如 airflow.cfg

[metrics]
otel_on = True
otel_host = localhost
otel_port = 8889
otel_prefix = airflow
otel_interval_milliseconds = 30000  # The interval between exports, defaults to 60000
otel_service = Airflow
otel_ssl_active = False

注意

以下配置键已被弃用,未来将被移除

[metrics]
otel_host = localhost
otel_port = 8889
otel_interval_milliseconds = 30000
otel_debugging_on = False
otel_service = Airflow
otel_ssl_active = False

OpenTelemetry SDK 应使用标准的 OpenTelemetry 环境变量进行配置,例如 OTEL_EXPORTER_OTLP_ENDPOINTOTEL_EXPORTER_OTLP_PROTOCOL 等。

请参阅 OpenTelemetry 导出器协议规范SDK 环境变量文档以获取更多信息。

启用 HTTPS

要建立到 OpenTelemetry Collector 的 HTTPS 连接,需要在 OpenTelemetry Collector 的 config.yml 文件中配置 SSL 证书和密钥。

receivers:
  otlp:
    protocols:
      http:
        endpoint: 0.0.0.0:4318
        tls:
          cert_file: "/path/to/cert/cert.crt"
          key_file: "/path/to/key/key.pem"

允许/阻止 列表

如果您想避免发送所有可用的指标,可以配置前缀的允许列表或阻止列表,仅发送或阻止以列表中元素开头的指标。

[metrics]
metrics_allow_list = scheduler,executor,dagrun,pool,triggerer,celery
[metrics]
metrics_block_list = scheduler,executor,dagrun,pool,triggerer,celery

重命名指标

如果您想将指标重定向到不同的名称,可以在 `[metrics]` 部分配置 `stat_name_handler` 选项。该选项应指向一个函数,该函数验证统计名称、在必要时对其进行修改,并返回转换后的统计名称。函数示例如下

def my_custom_stat_name_handler(stat_name: str) -> str:
    return stat_name.lower()[:32]

其他配置选项

注意

有关指标的详细配置选项列表,请参见配置参考文档 - [metrics]

指标描述

计数器

名称

旧名称

描述

{job_name}_start

-

启动的 {job_name} 作业数量,例如 SchedulerJob、LocalTaskJob

{job_name}_end

-

结束的 {job_name} 作业数量,例如 SchedulerJob、LocalTaskJob

{job_name}_heartbeat_failure

-

{job_name} 作业的心跳失败次数,例如 SchedulerJob、LocalTaskJob

local_task_job.task_exit

local_task_job.task_exit.{job_id}.{dag_id}.{task_id}.{return_code}

在运行 Dag `{dag_id}` 的任务 `{task_id}` 时,`LocalTaskJob` 以 `{return_code}` 结束的次数。该指标带有 job_id、dag_id、task_id 和 return_code 标签。

operator_failures

operator_failures_{operator_name}

操作器 {operator_name} 的失败次数。

operator_successes

operator_successes_{operator_name}

操作器 {operator_name} 的成功次数。

ti_failures

-

任务实例整体失败次数。该指标带有 dag_id 和 task_id 标签。

ti_successes

-

任务实例整体成功次数。该指标带有 dag_id 和 task_id 标签。

previously_succeeded

-

先前成功的任务实例数量。该指标带有 dag_id 和 task_id 标签。

task_instances_without_heartbeats_killed

-

因缺少心跳而被终止的任务实例。该指标带有 dag_id 和 task_id 标签。

scheduler_heartbeat

-

调度器心跳

dag_processor_heartbeat

-

独立的 Dag 处理器心跳

dag_processing.processes

-

当前运行的 Dag 解析进程的相对数量(即自上一次发送指标后,若进程已完成,此增量为负)。该指标带有 file_path 和 action 标签。

dag_processing.processor_timeouts

-

因耗时过长而被终止的文件处理器数量。该指标带有 file_path 标签。

dag_processing.other_callback_count

-

收到的非 SLA 回调次数

dag_processing.callback_only_count

-

仅处理回调而未进行完整 DAG 解析的 DAG 文件处理运行次数

dag_processing.file_path_queue_update_count

-

扫描文件系统并将所有现有 DAG 排队的次数

dag_file_processor_timeouts

-

(已弃用) 行为同 dag_processing.processor_timeouts

dag_processing.manager_stalls

-

`DagFileProcessorManager` 的卡顿次数

dag_file_refresh_error

-

加载任何 Dag 文件失败的次数

scheduler.tasks.killed_externally

-

外部终止的任务数量。该指标带有 dag_id 和 task_id 标签。

scheduler.orphaned_tasks.cleared

-

调度器清除的孤立任务数量

scheduler.orphaned_tasks.adopted

-

调度器接管的孤立任务数量

scheduler.critical_section_busy

-

调度器进程尝试获取关键区锁(用于将任务发送到执行器)但发现已被其他进程锁定的次数

ti.start

ti.start.{dag_id}.{task_id}

给定 DAG 中已启动任务的数量。类似于 {job_name}_start,但针对任务。该指标带有 dag_id 和 task_id 标签。

ti.finish

ti.finish.{dag_id}.{task_id}.{state}

给定 DAG 中已完成任务的数量。类似于 {job_name}_end,但针对任务。该指标带有 dag_id 和 task_id 标签。

dag.callback_exceptions

-

Dag 回调抛出的异常次数。出现此情况表示 Dag 回调未工作。该指标带有 dag_id 标签

celery.task_timeout_error

-

将任务发布到 Celery Broker 时抛出的 AirflowTaskTimeout 错误次数

celery.execute_command.failure

-

Celery 任务返回非零退出码的次数

task_removed_from_dag

task_removed_from_dag.{dag_id}

给定 DAG 中被移除的任务数量(即任务在 DAG 中不再存在)。该指标带有 dag_id 和 run_type 标签。

task_restored_to_dag

task_restored_to_dag.{dag_id}

给定 DAG 中被恢复的任务数量(即先前在数据库中处于 REMOVED 状态的任务实例被重新加入 DAG 文件)。该指标带有 dag_id 和 run_type 标签。

task_instance_created

task_instance_created_{task_type}

给定操作器创建的任务实例数量。该指标带有 dag_id 和 run_type 标签。

triggerer_heartbeat

-

触发器心跳

triggers.blocked_main_thread

-

阻塞主线程的触发器数量(可能是因为未完全异步)

triggers.failed

-

在触发事件前出错的触发器数量

triggers.succeeded

-

已触发至少一次事件的触发器数量

asset.updates

-

已更新资产的数量

asset.triggered_dagruns

-

因资产更新而触发的 Dag 运行次数

deadline_alerts.deadline_created

-

为 DagRun 创建的截止日期警报数量

deadline_alerts.deadline_missed

-

因 DagRun 错过截止日期而触发的警报数量

deadline_alerts.deadline_not_missed

-

因 DagRun 在截止日期前完成而删除的截止记录数量

ol.emit.failed

-

OpenLineage 事件发送尝试失败的次数

edge_worker.heartbeat_count

edge_worker.heartbeat_count.{worker_name}

Edge Worker 的心跳次数

edge_worker.ti.start

edge_worker.ti.start.{queue}.{dag_id}.{task_id}

在 Edge Worker 上启动的任务实例数量

edge_worker.ti.finish

edge_worker.ti.finish.{queue}.{state}.{dag_id}.{task_id}

在 Edge Worker 上完成的任务实例数量

仪表

名称

旧名称

描述

asset.orphaned

-

因不再在 Dag 调度参数或任务输出中引用而被标记为孤立的资产数量

dagbag_size

-

调度器根据配置进行扫描时发现的 DAG 数量

dag_processing.import_errors

-

尝试解析 Dag 文件时的错误次数

dag_processing.total_parse_time

-

扫描并导入 dag_processing.file_path_queue_size 个 Dag 文件所耗时间(秒)

dag_processing.file_path_queue_size

-

下次扫描时要考虑的 Dag 文件数量

dag_processing.last_run.seconds_ago.{dag_file}

-

{dag_file} 上次处理已过去的秒数

dag_processing.last_num_of_db_queries.{dag_file}

-

每个 {dag_file} 在解析期间对 Airflow 数据库的查询次数

scheduler.tasks.starving

-

由于池中没有可用槽位而无法调度的任务数量

scheduler.tasks.executable

-

根据池限制、Dag 并发、执行器状态和优先级,已准备好执行(设为 queued)的任务数量

scheduler.dagruns.running

-

最新的 DagRun 当前处于 RUNNING 状态的 DAG 数量

executor.open_slots

executor.open_slots.{executor_class_name}

执行器的可用槽位数量。仅在配置多个执行器时才会发出此旧指标。

executor.queued_tasks

executor.queued_tasks.{executor_class_name}

执行器上排队任务的数量。仅在配置多个执行器时才会发出此旧指标。

executor.running_tasks

executor.running_tasks.{executor_class_name}

执行器上运行任务的数量。仅在配置多个执行器时才会发出此旧指标。

pool.open_slots

pool.open_slots.{pool_name}

池中的可用槽位数量

pool.queued_slots

pool.queued_slots.{pool_name}

池中排队的槽位数量

pool.running_slots

pool.running_slots.{pool_name}

池中运行中的槽位数量

pool.deferred_slots

pool.deferred_slots.{pool_name}

池中延迟的槽位数量

pool.scheduled_slots

pool.scheduled_slots.{pool_name}

池中已调度的槽位数量

pool.starving_tasks

pool.starving_tasks.{pool_name}

池中因缺少槽位而饥饿的任务数量

triggers.running

triggers.running.{hostname}

由触发器(由主机名描述)当前运行的触发器数量

triggerer.capacity_left

triggerer.capacity_left.{hostname}

触发器可用于运行触发器的剩余容量(由主机名描述)

ti.scheduled

ti.scheduled.{queue}.{dag_id}.{task_id}

给定 DAG 中已调度任务的数量

ti.queued

ti.queued.{queue}.{dag_id}.{task_id}

给定 DAG 中排队任务的数量

ti.running

ti.running.{queue}.{dag_id}.{task_id}

给定 DAG 中运行中的任务数量。由于 ti.start 和 ti.finish 可能不同步,此指标显示所有运行中的 ti。

ti.deferred

ti.deferred.{queue}.{dag_id}.{task_id}

给定 DAG 中延迟任务的数量

ol.event.size.{event_type}.{operator_name}

-

按事件类型和操作器划分的 OpenLineage 事件大小(字节)

edge_worker.connected

edge_worker.connected.{worker_name}

处于已连接状态的 Edge worker

edge_worker.maintenance

edge_worker.maintenance.{worker_name}

处于维护状态的 Edge worker

edge_worker.jobs_active

edge_worker.jobs_active.{worker_name}

Edge worker 中活动作业的数量

edge_worker.concurrency

edge_worker.concurrency.{worker_name}

Edge worker 的并发容量

edge_worker.free_concurrency

edge_worker.free_concurrency.{worker_name}

Edge worker 中可用的并发量

edge_worker.num_queues

edge_worker.num_queues.{worker_name}

Edge worker 中的队列数量

计时器

名称

旧名称

描述

dagrun.dependency-check

dagrun.dependency-check.{dag_id}

检查 Dag 依赖所用的毫秒数

task.duration

dag.{dag_id}.{task_id}.duration

运行任务所用的毫秒数

task.scheduled_duration

dag.{dag_id}.{task_id}.scheduled_duration

任务在 Scheduled 状态下停留的毫秒数,直至进入 Queued 状态

task.queued_duration

dag.{dag_id}.{task_id}.queued_duration

任务在 Queued 状态下停留的毫秒数,直至进入 Running 状态

dag_processing.last_duration

dag_processing.last_duration.{dag_file}

加载给定 Dag 文件所用的毫秒数

dagrun.duration.success

dagrun.duration.success.{dag_id}

DagRun 达到成功状态所用的毫秒数

dagrun.duration.failed

dagrun.duration.failed.{dag_id}

DagRun 达到失败状态所用的毫秒数

dagrun.schedule_delay

dagrun.schedule_delay.{dag_id}

计划的 DagRun 开始日期与实际 DagRun 开始日期之间的延迟毫秒数

scheduler.critical_section_duration

-

调度器循环关键区所消耗的毫秒数

scheduler.critical_section_query_duration

-

运行关键区任务实例查询所消耗的毫秒数

scheduler.scheduler_loop_duration

-

运行一次调度器循环所消耗的毫秒数

dagrun.first_task_scheduling_delay

dagrun.{dag_id}.first_task_scheduling_delay

首次任务 start_date 与 dagrun 预期开始之间经过的毫秒数

collect_db_dags

-

从数据库获取所有序列化 Dag 所用的毫秒数

kubernetes_executor.clear_not_launched_queued_tasks.duration

-

在 Kubernetes Executor 中清除未启动的排队任务所用的毫秒数

kubernetes_executor.adopt_task_instances.duration

-

在 Kubernetes Executor 中接管任务实例所用的毫秒数

batch_executor.adopt_task_instances.duration

-

在 AWS Batch Executor 中接管任务实例所用的毫秒数

ecs_executor.adopt_task_instances.duration

-

在 AWS ECS Executor 中接管任务实例所用的毫秒数

lambda_executor.adopt_task_instances.duration

-

在 AWS Lambda Executor 中接管任务实例所用的毫秒数

edge_executor.sync.duration

-

Edge Executor 的一次同步心跳所用的毫秒数

ol.emit.attempts

ol.emit.attempts.{event_type}.{transport_type}

一次发送 OpenLineage 事件尝试所用的毫秒数

ol.extract.{event_type}.{operator_name}

-

按事件类型和操作器提取 OpenLineage 事件所用的毫秒数

airflow.io.load_filesystems

-

从提供者加载文件系统实现所用的毫秒数

serde.load_serializers

-

加载所有序列化模块所用的毫秒数

此条目是否有帮助?