指标配置¶
Airflow 可以配置为将指标发送到 StatsD 或 OpenTelemetry。
设置 - StatsD¶
要使用 StatsD,您必须先安装所需的软件包
pip install 'apache-airflow[statsd]'
然后将以下行添加到您的配置文件中,例如 airflow.cfg
[metrics]
statsd_on = True
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow
如果您想使用自定义 StatsD 客户端而不是 Airflow 提供的默认客户端,则必须将以下键及其自定义 StatsD 客户端的模块路径添加到配置文件中。此模块必须在您的 PYTHONPATH
中可用。
[metrics]
statsd_custom_client_path = x.y.customclient
有关 Python 和 Airflow 如何管理模块的详细信息,请参阅模块管理。
设置 - OpenTelemetry¶
要使用 OpenTelemetry,您必须先安装所需的软件包
pip install 'apache-airflow[otel]'
将以下行添加到您的配置文件中,例如 airflow.cfg
[metrics]
otel_on = True
otel_host = localhost
otel_port = 8889
otel_prefix = airflow
otel_interval_milliseconds = 30000 # The interval between exports, defaults to 60000
otel_ssl_active = False
启用 Https¶
要与 OpenTelemetry collector 建立 HTTPS 连接,您需要在 OpenTelemetry collector 的 config.yml
文件中配置 SSL 证书和密钥。
receivers:
otlp:
protocols:
http:
endpoint: 0.0.0.0:4318
tls:
cert_file: "/path/to/cert/cert.crt"
key_file: "/path/to/key/key.pem"
允许/阻止列表¶
如果您想避免发送所有可用指标,您可以配置允许列表或阻止列表,以便只发送或阻止以列表元素开头的指标
[metrics]
metrics_allow_list = scheduler,executor,dagrun,pool,triggerer,celery
[metrics]
metrics_block_list = scheduler,executor,dagrun,pool,triggerer,celery
重命名指标¶
如果您想将指标重定向到不同的名称,您可以在 [metrics]
部分配置 stat_name_handler
选项。它应该指向一个函数,该函数验证统计名称,必要时应用更改,并返回转换后的统计名称。该函数可能如下所示
def my_custom_stat_name_handler(stat_name: str) -> str:
return stat_name.lower()[:32]
其他配置选项¶
注意
有关指标配置选项的详细列表,请参阅配置参考文档 - [metrics]。
指标描述¶
计数器¶
名称 |
描述 |
---|---|
|
已启动的 |
|
已结束的 |
|
|
|
LocalTaskJob 在运行 DAG |
|
LocalTaskJob 在运行 DAG |
|
Operator |
|
Operator |
|
Operator |
|
Operator |
|
总体任务实例失败次数。带有 dag_id 和 task_id 标签的指标。 |
|
总体任务实例成功次数。带有 dag_id 和 task_id 标签的指标。 |
|
先前成功完成的任务实例数量。带有 dag_id 和 task_id 标签的指标。 |
|
没有心跳的任务实例被终止。带有 dag_id 和 task_id 标签的指标。 |
|
调度器心跳 |
|
独立 DAG 处理器心跳 |
|
当前正在运行的 DAG 解析进程的相对数量(即自从上次发送指标以来,进程完成时,此增量为负)。带有 file_path 和 action 标签的指标。 |
|
由于耗时过长而被终止的文件处理器数量。带有 file_path 标签的指标。 |
|
接收到的非 SLA 回调数量 |
|
扫描文件系统并将所有现有 dag 加入队列的次数 |
|
(已弃用) 与 |
|
DagFileProcessorManager 停滞次数 |
|
加载任何 DAG 文件失败次数 |
|
外部终止的任务数量。带有 dag_id 和 task_id 标签的指标。 |
|
调度器清除的孤立任务数量 |
|
调度器接管的孤立任务数量 |
|
调度器进程尝试锁定关键部分(将任务发送到执行器所需)并发现已被其他进程锁定的次数。 |
|
给定 dag 中已启动的任务数量。类似于 |
|
给定 dag 中已启动的任务数量。类似于 |
|
给定 dag 中已完成的任务数量。类似于 |
|
给定 dag 中已完成的任务数量。类似于 |
|
DAG 回调抛出的异常数量。发生这种情况时,表示 DAG 回调不起作用。带有 dag_id 标签的指标 |
|
将任务发布到 Celery Broker 时抛出的 |
|
Celery 任务的非零退出码数量。 |
|
给定 dag 中已删除的任务数量(即任务不再存在于 DAG 中)。 |
|
给定 dag 中已删除的任务数量(即任务不再存在于 DAG 中)。带有 dag_id 和 run_type 标签的指标。 |
|
给定 dag 中已恢复的任务数量(即先前在数据库中处于 REMOVED 状态的任务实例已添加到 DAG 文件中) |
|
给定 dag 中已恢复的任务数量(即先前在数据库中处于 REMOVED 状态的任务实例已添加到 DAG 文件中)。带有 dag_id 和 run_type 标签的指标。 |
|
给定 Operator 创建的任务实例数量 |
|
给定 Operator 创建的任务实例数量。带有 dag_id 和 run_type 标签的指标。 |
|
触发器心跳 |
|
阻塞主线程的触发器数量(可能是由于不是完全异步) |
|
在触发事件之前出错的触发器数量 |
|
至少触发了一个事件的触发器数量 |
|
已更新资产数量 |
|
标记为孤立资产的数量,因为它们不再在 DAG 调度参数或任务输出中被引用 |
|
由资产更新触发的 DAG 运行数量 |
计量器¶
名称 |
描述 |
---|---|
|
调度器根据其配置执行扫描时找到的 dag 数量 |
|
尝试解析 DAG 文件时发生的错误数量 |
|
扫描和导入 |
|
下次扫描要考虑的 DAG 文件数量 |
|
自上次处理 |
|
解析每个 |
|
由于资源池中没有可用槽位而无法调度的任务数量 |
|
根据资源池限制、DAG 并发、执行器状态和优先级,已准备好执行(设置为 queued)的任务数量。 |
|
特定执行器上的可用槽位数量。仅在配置了多个执行器时发出。 |
|
执行器上的可用槽位数量 |
|
特定执行器上的排队任务数量。仅在配置了多个执行器时发出。 |
|
执行器上的排队任务数量 |
|
特定执行器上正在运行的任务数量。仅在配置了多个执行器时发出。 |
|
执行器上正在运行的任务数量 |
|
资源池中的可用槽位数量 |
|
资源池中的可用槽位数量。带有 pool_name 标签的指标。 |
|
资源池中的排队槽位数量 |
|
资源池中的排队槽位数量。带有 pool_name 标签的指标。 |
|
资源池中正在运行的槽位数量 |
|
资源池中正在运行的槽位数量。带有 pool_name 标签的指标。 |
|
资源池中已延迟的槽位数量 |
|
资源池中已延迟的槽位数量。带有 pool_name 标签的指标。 |
|
资源池中已调度的槽位数量 |
|
资源池中已调度的槽位数量。带有 pool_name 标签的指标。 |
|
资源池中饥饿任务的数量 |
|
资源池中饥饿任务的数量。带有 pool_name 标签的指标。 |
|
任务使用的 CPU 百分比 |
|
任务使用的内存百分比 |
|
正在为某个触发器运行的触发器数量(由 hostname 描述) |
|
正在为某个触发器运行的触发器数量(由 hostname 描述)。带有 hostname 标签的指标。 |
|
触发器运行触发器的剩余容量(由 hostname 描述) |
|
触发器运行触发器的剩余容量(由 hostname 描述)。带有 hostname 标签的指标。 |
|
给定 dag 中正在运行的任务数量。由于 ti.start 和 ti.finish 可能不同步,此指标显示所有正在运行的 ti。 |
|
给定 dag 中正在运行的任务数量。由于 ti.start 和 ti.finish 可能不同步,此指标显示所有正在运行的 ti。带有 queue、dag_id 和 task_id 标签的指标。 |
计时器¶
名称 |
描述 |
---|---|
|
检查 DAG 依赖项所花费的毫秒数 |
|
检查 DAG 依赖项所花费的毫秒数。带有 dag_id 标签的指标。 |
|
运行任务所花费的毫秒数 |
|
运行任务所花费的毫秒数。带有 dag_id 和 task-id 标签的指标。 |
|
任务在 Scheduled 状态花费的毫秒数,然后进入 Queued 状态 |
|
任务在 Scheduled 状态花费的毫秒数,然后进入 Queued 状态。带有 dag_id 和 task_id 标签的指标。 |
|
任务在 Queued 状态花费的毫秒数,然后进入 Running 状态 |
|
任务在 Queued 状态花费的毫秒数,然后进入 Running 状态。带有 dag_id 和 task_id 标签的指标。 |
|
加载给定 DAG 文件所花费的毫秒数 |
|
加载给定 DAG 文件所花费的毫秒数。带有 file_name 标签的指标。 |
|
DagRun 达到成功状态所花费的毫秒数 |
|
DagRun 达到成功状态所花费的毫秒数。带有 dag_id 和 run_type 标签的指标。 |
|
DagRun 达到失败状态所花费的毫秒数 |
|
DagRun 达到失败状态所花费的毫秒数。带有 dag_id 和 run_type 标签的指标。 |
|
计划的 DagRun 开始日期与实际 DagRun 开始日期之间的延迟毫秒数 |
|
计划的 DagRun 开始日期与实际 DagRun 开始日期之间的延迟毫秒数。带有 dag_id 标签的指标。 |
|
调度器循环关键部分花费的毫秒数 – 每次只有一个调度器可以进入此循环 |
|
运行关键部分任务实例查询花费的毫秒数 |
|
运行一次调度器循环花费的毫秒数 |
|
第一个任务 start_date 与 dagrun 预期开始时间之间的毫秒数 |
|
第一个任务 start_date 与 dagrun 预期开始时间之间的毫秒数。带有 dag_id 和 run_type 标签的指标。 |
|
从数据库中获取所有序列化 Dag 所花费的毫秒数 |
|
在 Kubernetes Executor 中清除未启动的排队任务所花费的毫秒数 |
|
在 Kubernetes Executor 中接管任务实例所花费的毫秒数 |