高级日志配置¶
并非所有配置选项都可通过 airflow.cfg
文件获得。 配置文件描述了如何配置任务的日志记录,因为任务生成的日志不仅默认记录在单独的文件中,还必须可以通过 Web 服务器访问。
默认情况下,标准 Airflow 组件日志写入到 $AIRFLOW_HOME/logs
目录,但您也可以通过覆盖可通过提供自定义日志配置对象配置的 Python 日志记录器配置来对其进行自定义和配置。 您还可以为特定的 Operators 和任务创建和使用日志配置。
某些配置选项要求覆盖日志记录配置类。 您可以通过复制 Airflow 的默认配置并修改它以满足您的需求来实现。
可以在airflow_local_settings.py 模板中查看默认配置,您可以在其中看到使用的日志记录器和处理程序。
有关如何配置本地设置的详细信息,请参阅配置本地设置。
除了通过 airflow.cfg
在那里配置的自定义日志记录器和处理程序外,Airflow 中的日志记录方法遵循通常的 Python 日志记录约定,即 Python 对象记录到遵循 <package>.<module_name>
命名约定的日志记录器。
您可以在Python 日志记录文档中阅读有关标准 Python 日志记录类(日志记录器、处理程序、格式化程序)的更多信息。
创建自定义日志记录类¶
可以通过 airflow.cfg
文件中的 logging_config_class
选项来配置日志记录类。 此配置应指定与 logging.config.dictConfig()
兼容的配置的导入路径。 如果您的文件是标准导入位置,则应设置 PYTHONPATH
环境变量。
请按照以下步骤启用自定义日志记录配置类
首先将环境变量设置为已知目录,例如
~/airflow/
export PYTHONPATH=~/airflow/
创建一个目录来存储配置文件,例如
~/airflow/config
创建名为
~/airflow/config/log_config.py
的文件,内容如下from copy import deepcopy from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG LOGGING_CONFIG = deepcopy(DEFAULT_LOGGING_CONFIG)
在该文件末尾,添加代码以修改默认字典配置。
更新
$AIRFLOW_HOME/airflow.cfg
以包含[logging] logging_config_class = log_config.LOGGING_CONFIG
如果您计划仅通过启用远程日志记录来扩展/更新配置,您还可以将 logging_config_class
与远程日志记录一起使用。 然后,深度复制的字典将包含为您生成的远程日志记录配置,并且您的修改将在添加远程日志记录配置后应用
[logging] remote_logging = True logging_config_class = log_config.LOGGING_CONFIG
重新启动应用程序。
有关 Python 和 Airflow 如何管理模块的详细信息,请参阅模块管理。
注意
您可以覆盖组件的标准日志和“任务”日志的处理方式。
Operators、Hooks 和任务的自定义日志记录器¶
您可以创建自定义日志记录处理程序,并将它们应用于特定的 Operators、Hooks 和任务。 默认情况下,Operators 和 Hooks 日志记录器是 airflow.task
日志记录器的子级:它们分别遵循命名约定 airflow.task.operators.<package>.<module_name>
和 airflow.task.hooks.<package>.<module_name>
。 在创建自定义日志记录类后,您可以为它们分配特定的日志记录器。
用于 SQLExecuteQueryOperator
和 HttpHook
的自定义日志记录示例
from copy import deepcopy from pydantic.utils import deep_update from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG LOGGING_CONFIG = deep_update( deepcopy(DEFAULT_LOGGING_CONFIG), { "loggers": { "airflow.task.operators.airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator": { "handlers": ["task"], "level": "DEBUG", "propagate": True, }, "airflow.task.hooks.airflow.providers.http.hooks.http.HttpHook": { "handlers": ["task"], "level": "WARNING", "propagate": False, }, } }, )
您还可以使用 logger_name
属性为 DAG 的任务设置自定义名称。 如果多个任务使用同一个 Operator,但您想禁用其中一些任务的日志记录,这将非常有用。
自定义日志记录器名称的示例
# In your Dag file SQLExecuteQueryOperator(..., logger_name="sql.big_query") # In your custom `log_config.py` LOGGING_CONFIG = deep_update( deepcopy(DEFAULT_LOGGING_CONFIG), { "loggers": { "airflow.task.operators.sql.big_query": { "handlers": ["task"], "level": "WARNING", "propagate": True, }, } }, )
如果要限制任务的日志大小,可以添加 handlers.task.max_bytes 参数。
限制任务大小的示例
from copy import deepcopy from pydantic.utils import deep_update from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG LOGGING_CONFIG = deep_update( deepcopy(DEFAULT_LOGGING_CONFIG), { "handlers": { "task": { "max_bytes": 104857600, # 100MB } } }, )