高级日志配置
并非所有配置选项都可以在 airflow.cfg 文件中获得。该配置文件描述了如何为任务配置日志,因为任务生成的日志默认不仅记录在单独的文件中,还必须能够通过 Web 服务器访问。
默认情况下,标准的 Airflow 组件日志写入 $AIRFLOW_HOME/logs 目录,但您也可以通过覆盖 Python 日志记录器配置(可通过提供自定义日志配置对象进行配置)来自定义和配置它。您还可以为特定的 operators 和 tasks 创建并使用日志配置。
某些配置选项需要覆盖日志配置类。您可以通过复制 Airflow 的默认配置并对其进行修改以满足需求来实现。
默认配置可在 airflow_local_settings.py 模板 中查看,您可以看到其中使用的日志记录器和处理器。
有关如何配置本地设置的详细信息,请参见 配置本地设置。
除通过 airflow.cfg 可配置的自定义日志记录器和处理器外,Airflow 的日志方法遵循常规的 Python 日志约定,即 Python 对象记录到遵循 <package>.<module_name> 命名约定的日志记录器。
您可以在 Python 日志文档 中了解更多关于标准 Python 日志类(Logger、Handler、Formatter)的信息。
创建自定义日志类
可以通过在 airflow.cfg 文件中的 logging_config_class 选项来配置您的日志类。该配置应指定兼容 logging.config.dictConfig() 的导入路径。如果您的文件位于标准的导入位置,则应设置 PYTHONPATH 环境变量。
按照以下步骤启用自定义日志配置类
首先将环境变量设置为已知目录,例如
~/airflow/export PYTHONPATH=~/airflow/
创建一个目录来存放配置文件,例如
~/airflow/config创建名为
~/airflow/config/log_config.py的文件,内容如下from copy import deepcopy from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG LOGGING_CONFIG = deepcopy(DEFAULT_LOGGING_CONFIG)
在文件末尾,添加代码以修改默认的字典配置。
更新
$AIRFLOW_HOME/airflow.cfg,使其包含[logging] logging_config_class = log_config.LOGGING_CONFIG
如果您计划在启用远程日志的情况下仅扩展/更新配置,也可以将 logging_config_class 与远程日志一起使用。此时深拷贝的字典会包含为您生成的远程日志配置,您的修改将在远程日志配置添加之后生效。
[logging] remote_logging = True logging_config_class = log_config.LOGGING_CONFIG
重启应用程序。
有关 Python 和 Airflow 如何管理模块的详细信息,请参见 模块管理。
注意
您可以覆盖组件的标准日志以及 “task” 日志的处理方式。
Operators、Hooks 和 Tasks 的自定义日志记录器
您可以创建自定义日志处理器并将其应用于特定的 Operators、Hooks 和 tasks。默认情况下,Operators 和 Hooks 的日志记录器是 airflow.task 日志记录器的子记录器:它们分别遵循命名约定 airflow.task.operators.<package>.<module_name> 和 airflow.task.hooks.<package>.<module_name>。在 创建自定义日志类 之后,您可以为它们分配特定的日志记录器。
针对 SQLExecuteQueryOperator 和 HttpHook 的自定义日志示例
from copy import deepcopy from pydantic.utils import deep_update from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG LOGGING_CONFIG = deep_update( deepcopy(DEFAULT_LOGGING_CONFIG), { "loggers": { "airflow.task.operators.airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator": { "handlers": ["task"], "level": "DEBUG", "propagate": True, }, "airflow.task.hooks.airflow.providers.http.hooks.http.HttpHook": { "handlers": ["task"], "level": "WARNING", "propagate": False, }, } }, )
您还可以使用 logger_name 属性为 DAG 的任务设置自定义名称。如果多个任务使用相同的 Operator,但您希望禁用其中某些任务的日志记录,这将非常有用。
自定义日志记录器名称示例
# In your Dag file SQLExecuteQueryOperator(..., logger_name="sql.big_query") # In your custom `log_config.py` LOGGING_CONFIG = deep_update( deepcopy(DEFAULT_LOGGING_CONFIG), { "loggers": { "airflow.task.operators.sql.big_query": { "handlers": ["task"], "level": "WARNING", "propagate": True, }, } }, )
如果您想限制任务的日志大小,可以添加 handlers.task.max_bytes 参数。
限制任务大小的示例
from copy import deepcopy from pydantic.utils import deep_update from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG LOGGING_CONFIG = deep_update( deepcopy(DEFAULT_LOGGING_CONFIG), { "handlers": { "task": {"max_bytes": 104857600, "backup_count": 1} # 100MB and keep 1 history rotate log. } }, )