Airflow Summit 2025 即将于 10 月 07 日至 09 日举行。立即注册获取早鸟票!

任务日志

Airflow 以一种允许您在 Airflow UI 中单独查看每个任务日志的方式写入任务日志。核心 Airflow 提供了一个接口 FileTaskHandler,用于将任务日志写入文件,并包含一个在任务运行时从 worker 提供日志的机制。Apache Airflow 社区还发布了许多服务的提供者 (提供者),其中一些提供程序扩展了 Apache Airflow 的日志记录能力。您可以在写入日志中查看所有这些提供者。

使用 S3、GCS、WASB、HDFS 或 OSS 远程日志服务时,可以将本地日志文件上传到远程位置后删除,通过设置配置项来实现

[logging]
remote_logging = True
remote_base_log_folder = schema://path/to/remote/log
delete_local_logs = True

配置日志

对于默认处理器 FileTaskHandler,您可以使用 base_log_folderairflow.cfg 中指定日志文件存放的目录。默认情况下,日志存放在 AIRFLOW_HOME 目录中。

注意

有关配置设置的更多信息,请参阅设置配置选项

任务日志文件的命名遵循默认模式

  • 对于普通任务:dag_id={dag_id}/run_id={run_id}/task_id={task_id}/attempt={try_number}.log

  • 对于动态映射的任务:dag_id={dag_id}/run_id={run_id}/task_id={task_id}/map_index={map_index}/attempt={try_number}.log

这些模式可以通过log_filename_template进行调整。

此外,您可以提供一个远程位置来存储当前日志和备份。

从代码写入任务日志

Airflow 使用标准的 Python logging 框架来写入日志,并且在任务执行期间,根 logger 配置为写入任务日志。

大多数 operator 会自动将日志写入任务日志。这是因为它们有一个 log logger,您可以使用它来写入任务日志。这个 logger 由所有 operator 继承的 LoggingMixin 创建和配置。此外,由于根 logger 的处理,任何将日志传播到根的标准 logger(使用默认设置)也会写入任务日志。

因此,如果您想从自己的自定义代码向任务日志写入日志,可以执行以下任何操作

  • 使用 BaseOperator 中的 self.log logger 写入日志

  • 使用标准 print 语句打印到 stdout(不推荐,但在某些情况下可能有用)

  • 使用标准的 logger 方法,即使用 Python 模块名创建一个 logger,然后使用它向任务日志写入日志

这是 logger 在 Python 代码中直接使用的常用方式

import logging

logger = logging.getLogger(__name__)
logger.info("This is a log message")

日志行分组

在版本 2.9.0 中添加。

与 CI 流水线一样,Airflow 日志也可能非常大且难以阅读。因此,有时对日志区域进行分组并提供文本区域折叠以隐藏不相关内容会很有用。因此,Airflow 实现了一种兼容的日志消息分组方式,类似于 GithubAzure DevOps,以便可以折叠文本区域。所实现的方案兼容,因此在 CI 中生成输出的工具可以直接在 Airflow 中利用相同的体验。

通过添加具有起始和结束位置的日志标记,如下所示的日志消息可以分组

print("Here is some standard text.")
print("::group::Non important details")
print("bla")
print("debug messages...")
print("::endgroup::")
print("Here is again some standard text.")

在 web UI 中显示日志时,日志的显示将会被精简

[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} INFO - Here is some standard text.
[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} ⯈ Non important details
[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} INFO - Here is again some standard text.

如果您点击日志文本标签,将显示详细的日志行。

[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} INFO - Here is some standard text.
[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} ⯆ Non important details
[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} INFO - bla
[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} INFO - debug messages...
[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} ⯅⯅⯅ Log group end
[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} INFO - Here is again some standard text.

日志交错

Airflow 的远程任务日志处理器大致可分为两类:流式处理器(如 ElasticSearch、AWS Cloudwatch 和 GCP 操作日志,即原先的 stackdriver)和 Blob 存储处理器(如 S3、GCS、WASB)。

对于 Blob 存储处理器,根据任务的状态,日志可能位于许多不同的位置和多个不同的文件中。因此,我们需要检查所有位置并将找到的内容交错。要做到这一点,我们需要能够解析每一行的时间戳。如果您正在使用自定义格式化程序,您可能需要在 Airflow 设置 [logging] interleave_timestamp_parser 处提供一个可调用名称来覆盖默认解析器。

对于流式处理器,无论任务阶段或执行位置如何,所有日志消息都可以使用相同的标识符发送到日志服务,因此一般来说无需检查多个源并进行交错。

故障排除

如果您想检查当前设置的是哪个任务处理器,可以使用 airflow info 命令,如下例所示。

$ airflow info

Apache Airflow
version                | 2.9.0.dev0
executor               | LocalExecutor
task_logging_handler   | airflow.utils.log.file_task_handler.FileTaskHandler
sql_alchemy_conn       | postgresql+psycopg2://postgres:airflow@postgres/airflow
dags_folder            | /files/dags
plugins_folder         | /root/airflow/plugins
base_log_folder        | /root/airflow/logs
remote_base_log_folder |

[skipping the remaining outputs for brevity]

上面的 airflow info 输出已被截断,仅显示与日志配置相关的部分。您还可以运行 airflow config list 来检查日志配置选项是否具有有效值。

高级配置

您可以配置高级功能——包括添加您自己的自定义任务日志处理器(以及所有 Airflow 组件的日志处理器),并为每个 operator、hook 和任务创建自定义日志处理器。

从 Worker 和 Triggerer 提供日志

大多数任务处理器在任务完成后发送日志。为了实时查看日志,Airflow 在以下情况下启动 HTTP 服务器来提供日志

  • 如果使用 LocalExecutor,则在 airflow scheduler 运行时提供。

  • 如果使用 CeleryExecutor,则在 airflow worker 运行时提供。

在 triggerer 中,除非服务以选项 --skip-serve-logs 启动,否则会提供日志。

服务器运行在 [logging] 部分的 worker_log_server_port 选项指定的端口上,triggerer 则运行在 triggerer_log_server_port 选项指定的端口上。默认值分别为 8793 和 8794。Webserver 和 worker 之间的通信使用 [webserver] 部分的 secret_key 选项指定的密钥进行签名。您必须确保密钥匹配,以便通信顺利进行。

我们使用 Gunicorn 作为 WSGI 服务器。其配置选项可以通过 GUNICORN_CMD_ARGS 环境变量覆盖。详情请参阅 Gunicorn 设置

实现自定义文件任务处理器

注意

这是一个高级主题,大多数用户应该可以直接使用写入日志中的现有处理器。

在我们的提供者中,我们为所有主要的云提供商提供了丰富的选项。但如果您需要使用不同的服务实现日志记录,并决定实现自定义 FileTaskHandler,则需要注意一些设置,特别是在 trigger 日志记录的上下文中。

Trigger 要求改变日志设置的方式。与任务不同,许多 trigger 在同一进程中运行,并且由于 trigger 运行在 asyncio 中,我们必须注意不要通过日志处理器引入阻塞调用。并且由于处理器的行为各异(一些写入文件,一些上传到 Blob 存储,一些在消息到达时通过网络发送消息,一些在线程中执行此操作),我们需要有一种方法让 triggerer 知道如何使用它们。

为了实现这一点,我们有一些可以在处理器实例或类上设置的属性。这些参数不遵循继承,因为 FileTaskHandler 的子类在相关特性上可能与其不同。这些参数描述如下

  • trigger_should_wrap:控制此处理器是否应由 TriggerHandlerWrapper 包装。当处理器的每个实例都创建一个文件处理器并向其写入所有消息时,这是必要的。

  • trigger_should_queue:控制 triggerer 是否应在事件循环和处理器之间放置一个 QueueListener,以确保处理器中的阻塞 IO 不会中断事件循环。

  • trigger_send_end_marker:控制当 trigger 完成时是否应向 logger 发送 END 信号。它用于告诉包装器关闭并移除刚完成的 trigger 特定的个体文件处理器。

  • trigger_supported:如果 trigger_should_wraptrigger_should_queue 不是 True,我们通常假定处理器不支持 trigger。但在这种情况下,如果处理器的 trigger_supported 设置为 True,那么我们仍然会在 triggerer 启动时将处理器移动到根,以便它将处理 trigger 消息。本质上,这对于“原生”支持 trigger 的处理器应该为 true。例如 StackdriverTaskHandler 就是这样的例子。

此条目有帮助吗?