任务日志记录

Airflow 以一种允许你在 Airflow UI 中单独查看每个任务日志的方式编写任务日志。核心 Airflow 提供了一个接口 FileTaskHandler,它将任务日志写入文件,并包括一种机制,以便在任务运行时从工作进程提供这些日志。Apache Airflow 社区还为许多服务发布了提供程序 (提供程序包),其中一些提供程序提供了扩展 Apache Airflow 日志记录功能的处理程序。你可以在 编写日志 中查看所有这些提供程序。

在使用 S3、GCS、WASB 或 OSS 远程日志记录服务时,可以通过设置配置在将本地日志文件上传到远程位置后删除它们

[logging]
remote_logging = True
remote_base_log_folder = schema://path/to/remote/log
delete_local_logs = True

配置日志记录

对于默认处理程序 FileTaskHandler,可以使用 base_log_folderairflow.cfg 中指定放置日志文件的目录。默认情况下,日志放置在 AIRFLOW_HOME 目录中。

注意

有关设置配置的更多信息,请参阅 设置配置选项

为任务命名日志文件时遵循默认模式

  • 对于普通任务:dag_id={dag_id}/run_id={run_id}/task_id={task_id}/attempt={try_number}.log

  • 对于动态映射的任务:dag_id={dag_id}/run_id={run_id}/task_id={task_id}/map_index={map_index}/attempt={try_number}.log

可以通过 log_filename_template 调整这些模式。

此外,还可以提供一个远程位置来存储当前日志和备份。

从代码中写入任务日志

Airflow 使用标准 Python 日志记录 框架来写入日志,并且在任务持续期间,根记录器被配置为写入任务的日志。

大多数运算符会自动将日志写入任务日志。这是因为它们有一个 log 记录器,你可以使用它写入任务日志。此记录器由 LoggingMixin 创建和配置,所有运算符都派生自该记录器。但是,由于根记录器处理,任何将日志记录传播到根的标准记录器(使用默认设置)也会写入任务日志。

因此,如果你想从自定义代码记录到任务日志,可以执行以下任一操作

  • 使用 BaseOperator 中的 self.log 记录器记录

  • 使用标准 print 语句打印到 stdout(不推荐,但在某些情况下可能有用)

  • 使用标准记录器方法,使用 Python 模块名称创建记录器并使用它写入任务日志

这是在 Python 代码中直接使用记录器的常用方式

import logging

logger = logging.getLogger(__name__)
logger.info("This is a log message")

日志行分组

2.9.0 版中的新增功能。

与 CI 管道类似,Airflow 日志也可能非常大,难以阅读。因此,有时将日志区域的部分分组并提供文本区域折叠以隐藏不相关内容非常有用。因此,Airflow 实施了与 GithubAzure DevOps 兼容的日志消息分组,以便可以折叠文本区域。实施的方案是兼容的,以便在 CI 中生成输出的工具可以直接在 Airflow 中利用相同的体验。

通过添加带有开始和结束位置的日志标记,例如以下日志消息可以分组

print("Here is some standard text.")
print("::group::Non important details")
print("bla")
print("debug messages...")
print("::endgroup::")
print("Here is again some standard text.")

在 Web UI 中显示日志时,日志的显示将被压缩

[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} INFO - Here is some standard text.
[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} ⯈ Non important details
[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} INFO - Here is again some standard text.

如果您单击日志文本标签,将显示详细的日志行。

[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} INFO - Here is some standard text.
[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} ⯆ Non important details
[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} INFO - bla
[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} INFO - debug messages...
[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} ⯅⯅⯅ Log group end
[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} INFO - Here is again some standard text.

日志交错

Airflow 的远程任务日志处理程序大致可以分为两类:流处理程序(例如 ElasticSearch、AWS Cloudwatch 和 GCP 操作日志记录,以前称为 Stackdriver)和 Blob 存储处理程序(例如 S3、GCS、WASB)。

对于 Blob 存储处理程序,根据任务的状态,日志可能位于许多不同的位置和多个不同的文件中。因此,我们需要检查所有位置并交错我们找到的内容。为此,我们需要能够解析每行的 timestamp。如果您使用的是自定义格式化程序,您可能需要通过在 Airflow 设置 [logging] interleave_timestamp_parser 中提供可调用名称来覆盖默认解析器。

对于流处理程序,无论任务阶段或执行位置如何,所有日志消息都可以使用相同的标识符发送到日志记录服务,因此通常无需检查多个来源并进行交错。

故障排除

如果您想检查当前设置了哪个任务处理程序,可以使用 airflow info 命令,如下例所示。

$ airflow info

Apache Airflow
version                | 2.9.0.dev0
executor               | LocalExecutor
task_logging_handler   | airflow.utils.log.file_task_handler.FileTaskHandler
sql_alchemy_conn       | postgresql+psycopg2://postgres:airflow@postgres/airflow
dags_folder            | /files/dags
plugins_folder         | /root/airflow/plugins
base_log_folder        | /root/airflow/logs
remote_base_log_folder |

[skipping the remaining outputs for brevity]

上面的 airflow info 输出被截断,仅显示与日志记录配置相关的部分。您还可以运行 airflow config list 来检查日志记录配置选项是否具有有效值。

高级配置

你可以配置高级功能 - 包括添加你自己的自定义任务日志处理程序(也包括所有 airflow 组件的日志处理程序),以及为操作符、钩子和任务创建自定义日志处理程序。

从工作人员和触发器提供日志

大多数任务处理程序在任务完成后发送日志。为了实时查看日志,Airflow 在以下情况下启动一个 HTTP 服务器来提供日志

  • 如果使用 SequentialExecutorLocalExecutor,则在 airflow scheduler 运行时。

  • 如果使用 CeleryExecutor,则在 airflow worker 运行时。

在触发器中,除非使用选项 --skip-serve-logs 启动服务,否则会提供日志。

该服务器在 [logging] 部分中的 worker_log_server_port 选项指定的端口上运行,而触发器的选项为 triggerer_log_server_port。默认值分别为 8793 和 8794。Web 服务器和工作人员之间的通信由 [webserver] 部分中的 secret_key 选项指定的密钥签名。你必须确保密钥匹配,以便通信能够顺利进行。

我们使用 Gunicorn 作为 WSGI 服务器。可以使用 GUNICORN_CMD_ARGS 环境变量覆盖其配置选项。有关详细信息,请参阅 Gunicorn 设置

实现自定义文件任务处理程序

注意

这是一个高级主题,大多数用户应该能够直接使用 编写日志 中的现有处理程序。

在我们的提供程序中,我们有各种各样的选项,涵盖所有主要云提供商。但是,如果你需要使用不同的服务实现日志记录,并且你决定实现自定义 FileTaskHandler,那么有一些设置需要注意,尤其是在触发器日志记录的上下文中。

触发器需要改变日志记录的设置方式。与任务不同,许多触发器在同一进程中运行,而且由于触发器在 asyncio 中运行,因此我们必须注意不要通过日志记录处理程序引入阻塞调用。并且由于处理程序行为的变化(有些写入文件,有些上传到 Blob 存储,有些在到达时通过网络发送消息,有些在进程中执行),我们需要某种方式让触发器知道如何使用它们。

为了实现这一点,我们有一些属性可以设置在处理程序上,无论是实例还是类。这些参数不尊重继承,因为 FileTaskHandler 的子类可能在相关特征上与它不同。这些参数如下所述

  • trigger_should_wrap:控制此处理程序是否应由 TriggerHandlerWrapper 包装。当处理程序的每个实例创建一个文件处理程序并向其写入所有消息时,这是必需的。

  • trigger_should_queue:控制触发器是否应在事件循环和处理程序之间放置一个 QueueListener,以确保处理程序中的阻塞 IO 不会中断事件循环。

  • trigger_send_end_marker:控制当触发器完成时是否应向记录器发送 END 信号。它用于告诉包装器关闭并删除特定于刚刚完成的触发器的单个文件处理程序。

  • trigger_supported:如果 trigger_should_wraptrigger_should_queue 不是 True,我们通常认为处理程序不支持触发器。但在这种情况下,如果处理程序将 trigger_supported 设置为 True,那么我们仍会将处理程序在触发器启动时移动到根目录,以便它处理触发器消息。从本质上讲,这对于“本机”支持触发器的处理程序应该是正确的。StackdriverTaskHandler 就是这样的一个例子。

此条目有帮助吗?