Airflow 峰会 2025 将于 10 月 07-09 日举行。立即注册以获得早鸟票!

高级日志配置

并非所有配置选项都可以在 airflow.cfg 文件中获得。配置文件描述了如何为任务配置日志记录,因为任务生成的日志默认不仅会记录到单独的文件中,还必须可以通过 Web 服务器访问。

默认情况下,标准的 Airflow 组件日志写入到 $AIRFLOW_HOME/logs 目录,但你也可以通过覆盖 Python 日志记录器配置来自定义和按需配置它,这可以通过提供自定义日志配置对象来完成。你还可以为特定的 operators 和 tasks 创建并使用日志配置。

某些配置选项要求覆盖日志配置类。你可以通过复制 Airflow 的默认配置并修改它以满足你的需求来做到这一点。

可以在 airflow_local_settings.py 模板文件 中看到默认配置,并且可以在其中看到使用的日志记录器和处理程序。

参见 配置本地设置 以了解如何配置本地设置的详细信息。

除了可以通过 airflow.cfg 配置的自定义日志记录器和处理程序外,Airflow 中的日志记录方法遵循常见的 Python 日志记录约定,即 Python 对象将日志记录到遵循命名约定的日志记录器中,该约定为 <package>.<module_name>

你可以在 Python 日志记录文档 中阅读更多关于标准 Python 日志记录类(日志记录器、处理程序、格式化程序)的信息。

创建自定义日志类

可以通过 airflow.cfg 文件中的 logging_config_class 选项来配置你的日志类。此配置应指定导入路径,该路径指向与 logging.config.dictConfig() 兼容的配置。如果你的文件位于标准的导入位置,则应设置 PYTHONPATH 环境变量。

按照以下步骤启用自定义日志配置类

  1. 首先将环境变量设置为已知目录,例如 ~/airflow/

    export PYTHONPATH=~/airflow/
    
  2. 创建一个目录用于存储配置文件,例如 ~/airflow/config

  3. 创建一个名为 ~/airflow/config/log_config.py 的文件,内容如下

    from copy import deepcopy
    from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
    
    LOGGING_CONFIG = deepcopy(DEFAULT_LOGGING_CONFIG)
    
  4. 在文件末尾,添加代码来修改默认的字典配置。

  5. 更新 $AIRFLOW_HOME/airflow.cfg 文件以包含以下内容

    [logging]
    logging_config_class = log_config.LOGGING_CONFIG
    

如果你计划在启用远程日志记录的同时仅扩展/更新配置,你也可以将 logging_config_class 与远程日志记录一起使用。此时,深度复制的字典将包含为你生成的远程日志配置,并且你的修改将在远程日志配置添加后应用。

[logging]
remote_logging = True
logging_config_class = log_config.LOGGING_CONFIG
  1. 重启应用程序。

参见 模块管理 以了解 Python 和 Airflow 如何管理模块的详细信息。

注意

你可以覆盖处理组件标准日志和“任务”日志的方式。

Operators、Hooks 和 Tasks 的自定义日志记录器

你可以创建自定义日志处理程序并将其应用于特定的 Operators、Hooks 和 tasks。默认情况下,Operators 和 Hooks 的日志记录器是 airflow.task 日志记录器的子级:它们分别遵循命名约定 airflow.task.operators.<package>.<module_name>airflow.task.hooks.<package>.<module_name>。在 创建自定义日志类 后,你可以为它们分配特定的日志记录器。

SQLExecuteQueryOperatorHttpHook 的自定义日志记录示例

from copy import deepcopy
from pydantic.utils import deep_update
from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG

LOGGING_CONFIG = deep_update(
    deepcopy(DEFAULT_LOGGING_CONFIG),
    {
        "loggers": {
            "airflow.task.operators.airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator": {
                "handlers": ["task"],
                "level": "DEBUG",
                "propagate": True,
            },
            "airflow.task.hooks.airflow.providers.http.hooks.http.HttpHook": {
                "handlers": ["task"],
                "level": "WARNING",
                "propagate": False,
            },
        }
    },
)

你还可以使用 logger_name 属性为 Dag 的任务设置自定义名称。如果多个任务使用相同的 Operator,但你希望为其中一些任务禁用日志记录,这会很有用。

自定义日志记录器名称示例

# In your Dag file
SQLExecuteQueryOperator(..., logger_name="sql.big_query")

# In your custom `log_config.py`
LOGGING_CONFIG = deep_update(
    deepcopy(DEFAULT_LOGGING_CONFIG),
    {
        "loggers": {
            "airflow.task.operators.sql.big_query": {
                "handlers": ["task"],
                "level": "WARNING",
                "propagate": True,
            },
        }
    },
)

如果你想限制任务的日志大小,可以添加 handlers.task.max_bytes 参数。

限制任务大小的示例

from copy import deepcopy
from pydantic.utils import deep_update
from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG

LOGGING_CONFIG = deep_update(
    deepcopy(DEFAULT_LOGGING_CONFIG),
    {
        "handlers": {
            "task": {"max_bytes": 104857600, "backup_count": 1}  # 100MB and keep 1 history rotate log.
        }
    },
)

本条目有帮助吗?