任务日志

Airflow 以一种方式写入任务日志,使您能够在 Airflow UI 中分别查看每个任务的日志。核心 Airflow 提供了 FileTaskHandler 接口,该接口将任务日志写入文件,并包含在任务运行时从工作节点提供日志的机制。Apache Airflow 社区还发布了许多服务的提供者,其中一些提供了扩展 Apache Airflow 日志功能的处理程序。您可以在日志写入 中看到所有这些提供者。

使用 S3、GCS、WASB、HDFS 或 OSS 远程日志服务时,您可以在日志文件上传到远程位置后删除本地日志文件,只需设置相应配置。

[logging]
remote_logging = True
remote_base_log_folder = schema://path/to/remote/log
delete_local_logs = True

配置日志

对于默认处理程序 FileTaskHandler,您可以在 airflow.cfg 中使用 base_log_folder 指定放置日志文件的目录。默认情况下,日志放在 AIRFLOW_HOME 目录中。

注意

有关设置配置的更多信息,请参见 设置配置选项

为任务命名日志文件时遵循默认模式

  • 普通任务:dag_id={dag_id}/run_id={run_id}/task_id={task_id}/attempt={try_number}.log

  • 动态映射任务:dag_id={dag_id}/run_id={run_id}/task_id={task_id}/map_index={map_index}/attempt={try_number}.log

可以通过 log_filename_template 调整这些模式。

此外,您可以提供远程位置来存储当前日志和备份。

从代码写入任务日志

Airflow 使用标准的 Python logging 框架写入日志,在任务期间,根记录器被配置为写入任务日志。

大多数算子会自动将日志写入任务日志。这是因为它们拥有一个 log 属性(类型为 Logger),您可以使用它向任务日志写入。这种记录器会自动为所有继承自 BaseOperator 的算子进行配置。

此外,由于任务执行期间根记录器的配置,任何使用默认设置的标准 Python 记录器(会向根记录器传播)也会写入任务日志。

因此,如果您想从自定义代码向任务日志写入,可以采用以下任意方式

  • 使用 self.log 记录器(来自 BaseOperator)记录

  • 使用标准的 print 语句输出到 stdout(不推荐,但在某些情况下可能有用)

  • 使用标准日志记录方式:按 Python 模块名创建记录器并使用它写入任务日志

这是在 Python 代码中直接使用记录器的常用方式

import logging

logger = logging.getLogger(__name__)
logger.info("This is a log message")

日志行分组

添加于版本 2.9.0。

就像 CI 流水线一样,Airflow 的日志有时会非常庞大且难以阅读。因此,将日志的某些区域进行分组并提供可折叠的文本区域以隐藏不相关内容是很有用的。Airflow 实现了与 GitHubAzure DevOps 兼容的日志行分组功能,使得文本区域可以折叠。该方案兼容,使得在 CI 中输出的工具能够在 Airflow 中获得相同的体验。

通过添加带有起始和结束位置的日志标记,例如以下示例,日志消息可以被分组

print("Here is some standard text.")
print("::group::Non important details")
print("bla")
print("debug messages...")
print("::endgroup::")
print("Here is again some standard text.")

在 Web UI 中显示日志时,日志将被折叠展示

[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} INFO - Here is some standard text.
[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} ⯈ Non important details
[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} INFO - Here is again some standard text.

如果点击日志文本标签,将显示详细的日志行。

[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} INFO - Here is some standard text.
[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} ⯆ Non important details
[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} INFO - bla
[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} INFO - debug messages...
[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} ⯅⯅⯅ Log group end
[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} INFO - Here is again some standard text.

日志交错

Airflow 的远程任务日志处理程序大致可以分为两类:流式处理程序(如 ElasticSearch、AWS CloudWatch、GCP Operations Logging,前称 Stackdriver)和对象存储处理程序(如 S3、GCS、WASB)。

对于对象存储处理程序,依据任务状态,日志可能分散在多个位置和多个文件中。因此,需要检查所有位置并交错合并所找到的内容。为此必须能够解析每行的时间戳。如果您使用了自定义格式化器,可能需要通过在 Airflow 设置 [logging] interleave_timestamp_parser 中提供可调用名称来覆盖默认解析器。

对于流式处理程序,无论任务阶段或执行位置如何,所有日志消息都可以使用相同的标识符发送到日志服务,通常不需要检查多个来源并进行交错。

故障排查

如果想查看当前使用的任务处理程序,可以使用 airflow info 命令,如下例所示。

$ airflow info

Apache Airflow
version                | 2.9.0.dev0
executor               | LocalExecutor
task_logging_handler   | airflow.utils.log.file_task_handler.FileTaskHandler
sql_alchemy_conn       | postgresql+psycopg2://postgres:airflow@postgres/airflow
dags_folder            | /files/dags
plugins_folder         | /root/airflow/plugins
base_log_folder        | /root/airflow/logs
remote_base_log_folder |

[skipping the remaining outputs for brevity]

上面的 airflow info 输出已截断,仅显示与日志配置相关的部分。您也可以运行 airflow config list 检查日志配置选项是否具有有效值。

高级配置

您可以配置高级功能——包括添加自定义任务日志处理程序(也可以为所有 Airflow 组件添加日志处理程序),以及为算子、钩子和任务创建自定义日志处理程序。

从工作节点和触发器提供日志

大多数任务处理程序会在任务完成后发送日志。为了实时查看日志,Airflow 在以下情况下启动 HTTP 服务器来提供日志:

  • 如果使用 LocalExecutor,则在 airflow scheduler 正在运行时。

  • 如果使用 CeleryExecutor,则在 airflow worker 正在运行时。

在 triggerer 中,除非以 --skip-serve-logs 选项启动服务,否则会提供日志。

服务器运行的端口由 worker_log_server_port(在 [logging] 部分)以及用于 triggerer 的 triggerer_log_server_port 选项决定,默认分别为 8793 和 8794。Web 服务器与工作节点之间的通信使用 secret_key(在 [api] 部分)签名。您必须确保密钥匹配,以便通信顺畅。

我们使用 Gunicorn 作为 WSGI 服务器。其配置选项可以通过环境变量 GUNICORN_CMD_ARGS 覆盖。详情请参见 Gunicorn 设置

实现自定义文件任务处理程序

注意

这是一个高级主题,大多数用户只需使用日志写入 中已有的处理程序即可。

在我们的提供者中已经包含了主要云服务商的丰富选项。但如果您需要使用其他服务实现日志,并决定实现自定义 FileTaskHandler,则需要注意一些设置,尤其是在触发器日志的上下文中。

触发器对日志的设置方式有所不同。与任务不同,许多触发器在同一进程中运行,并且因为触发器在 asyncio 中运行,我们必须避免在日志处理程序中引入阻塞调用。由于处理程序行为多样(有的写入文件,有的上传到对象存储,有的实时发送网络消息,有的在子线程中完成),我们需要让 triggerer 知道如何使用它们。

为实现上述目标,处理程序可以在实例或类上设置以下属性。由于子类可能在相关特性上与 FileTaskHandler 不同,这些参数不遵循继承规则。下面对这些参数进行说明。

  • trigger_should_wrap:控制是否应由 TriggerHandlerWrapper 包装此处理程序。当每个实例创建一个文件处理程序并将所有消息写入该文件时,需要此设置。

  • trigger_should_queue:控制 triggerer 是否应在事件循环与处理程序之间放置 QueueListener,以确保处理程序中的阻塞 I/O 不会扰乱事件循环。

  • trigger_send_end_marker:控制在触发器完成时是否向记录器发送 END 信号。该信号用于通知包装器关闭并移除对应触发器的文件处理程序。

  • trigger_supported:如果 trigger_should_wraptrigger_should_queue 均为 False,则默认认为该处理程序不支持触发器。但如果此时该处理程序将 trigger_supported 设置为 True,Airflow 仍会在 triggerer 启动时将其移动到根记录器,以便处理触发器消息。实际上,这应当用于“原生”支持触发器的处理程序。例如 StackdriverTaskHandler

此条目是否有帮助?