模块管理

Airflow 允许你在 DAG 和 Airflow 配置中使用你自己的 Python 模块。以下文章将描述如何创建你自己的模块,以便 Airflow 可以正确加载它,以及诊断模块未正确加载时的问题。

通常,你希望在 Airflow 部署中使用你自己的 Python 代码,例如通用代码、库,你可能希望使用共享的 Python 代码生成 DAG,并拥有多个 DAG Python 文件。

你可以通过以下方式之一实现

  • 将你的模块添加到 Airflow 自动添加到 PYTHONPATH 的文件夹之一

  • 将你存放代码的额外文件夹添加到 PYTHONPATH

  • 将你的代码打包成一个 Python 包,并与 Airflow 一起安装。

下一章将概述 Python 如何加载包和模块,并深入探讨上述三种可能性的具体细节。

Python 中包/模块的加载方式

Python 尝试从中加载模块的目录列表由变量 sys.path 给出。Python 确实尝试根据操作系统、Python 的安装方式以及使用的 Python 版本来智能地确定此变量的内容。

你可以通过运行如下示例中的交互式终端来检查当前 Python 环境的此变量内容

>>> import sys
>>> from pprint import pprint
>>> pprint(sys.path)
['',
 '/home/arch/.pyenv/versions/3.8.4/lib/python37.zip',
 '/home/arch/.pyenv/versions/3.8.4/lib/python3.8',
 '/home/arch/.pyenv/versions/3.8.4/lib/python3.8/lib-dynload',
 '/home/arch/venvs/airflow/lib/python3.8/site-packages']

sys.path 在程序启动期间初始化。首先优先考虑当前目录,即 path[0] 是包含用于调用当前脚本的目录,如果它是交互式 shell,则为空字符串。第二个优先级是 PYTHONPATH(如果提供),然后是安装相关的默认路径,由 site 模块管理。

也可以在 Python 会话期间通过简单地使用 append 修改 sys.path(例如,sys.path.append("/path/to/custom/package"))。Python 将在添加新的路径后开始在新的路径中搜索包。Airflow 利用此功能,如 将目录添加到 PYTHONPATH部分所述。

在变量 sys.path 中有一个目录 site-packages,其中包含已安装的外部包,这意味着你可以使用 pipanaconda 安装包,并在 Airflow 中使用它们。在下一节中,你将学习如何创建你自己的简单可安装包,以及如何使用环境变量 PYTHONPATH指定要添加到 sys.path 的其他目录。

还要确保将 init 文件添加到你的文件夹

包的典型结构

这是你可能在 dags 文件夹中拥有的示例结构

<DIRECTORY ON PYTHONPATH>
| .airflowignore  -- only needed in ``dags`` folder, see below
| -- my_company
              | __init__.py
              | common_package
              |              |  __init__.py
              |              | common_module.py
              |              | subpackage
              |                         | __init__.py
              |                         | subpackaged_util_module.py
              |
              | my_custom_dags
                              | __init__.py
                              | my_dag1.py
                              | my_dag2.py
                              | base_dag.py

在上面的例子中,这些是你导入 python 文件的方式

from my_company.common_package.common_module import SomeClass
from my_company.common_package.subpackage.subpackaged_util_module import AnotherClass
from my_company.my_custom_dags.base_dag import BaseDag

你可以看到你的文件夹根目录下的 .airflowignore 文件。你可以将此文件放在你的 dags 文件夹中,以告知 Airflow 当 Airflow 调度器查找 DAG 时应忽略该文件夹中的哪些文件。它应该包含正则表达式(默认)或 glob 表达式,用于应该忽略的路径。你不需要在 PYTHONPATH 中的任何其他文件夹中拥有该文件(并且你也可以只在其他文件夹中保留共享代码,而不是实际的 DAG)。

在上面的示例中,DAG 仅在 my_custom_dags 文件夹中,common_package 在搜索 DAG 时不应被调度程序扫描,因此我们应该忽略 common_package 文件夹。如果你在那里保留一个基础 DAG,并且 my_dag1.pymy_dag2.py 从该基础 DAG 派生,你还需要忽略 base_dag.py。那么你的 .airflowignore 应该如下所示

my_company/common_package/.*
my_company/my_custom_dags/base_dag\.py

如果 DAG_IGNORE_FILE_SYNTAX 设置为 glob,则等效的 .airflowignore 文件将是

my_company/common_package/
my_company/my_custom_dags/base_dag.py

Airflow 中内置的 PYTHONPATH 条目

Airflow 在动态运行时,会将三个目录添加到 sys.path

  • dags 文件夹:它通过 [core] 部分中的选项 dags_folder 配置。

  • config 文件夹:默认情况下,通过设置 AIRFLOW_HOME 变量({AIRFLOW_HOME}/config)配置。

  • plugins 文件夹:它通过 [core] 部分中的选项 plugins_folder 配置。

注意

Airflow 2 中的 DAGS 文件夹不应与 Web 服务器共享。虽然你可以这样做,但与 Airflow 1.10 不同,Airflow 不期望 Web 服务器中存在 DAGS 文件夹。事实上,与 Web 服务器共享 dags 文件夹存在一定的安全风险,因为它意味着编写 DAG 的人可以编写 Web 服务器能够执行的代码(理想情况下,Web 服务器永远不应运行可以由编写 DAG 的用户修改的代码)。因此,如果你需要与 Web 服务器共享一些代码,强烈建议你通过 configplugins 文件夹或通过安装的 Airflow 包(见下文)共享。这些文件夹通常由与 DAG 文件夹(通常是数据科学家)不同的用户(管理员/DevOps)管理和访问,因此它们被认为是安全的,因为它们是 Airflow 安装配置的一部分,并由管理安装的人员控制。

代码命名的最佳实践

当你导入代码时,有一些需要注意的陷阱。

有时,您可能会看到 Airflow 或您使用的其他库代码引发的异常,提示 模块 'X' 没有属性 'Y'。这通常是由于您的 PYTHONPATH 顶层中有一个名为“X”的模块或包,它被导入而不是原始代码期望的模块。

您应该始终为您的包和模块使用唯一的名称,并且下面描述了一些方法来确保强制执行唯一性。

使用唯一的顶级包名

最重要的是,避免为直接添加到 PYTHONPATH 顶层的任何内容使用通用名称。例如,如果您将包含 __init__.pyairflow 文件夹添加到 DAGS_FOLDER,它将与 Airflow 包冲突,并且您将无法从 Airflow 包导入任何内容。类似地,不要直接在那里添加 airflow.py 文件。标准库包使用的常见名称,如 multiprocessinglogging 等,也不应作为顶层使用——无论是作为包(即带有 __init__.py 的文件夹)还是作为模块(即 .py 文件)。

这同样适用于 configplugins 文件夹,它们也位于 PYTHONPATH 中,以及您手动添加到 PYTHONPATH 的任何内容(详见后续章节)。

建议您始终将 DAG/公共文件放在对于您的部署是唯一的子包中(在下面的示例中为 my_company)。为将与系统中已存在的其他包冲突的文件夹使用通用名称太容易了。例如,如果您创建 airflow/operators 子文件夹,则它将无法访问,因为 Airflow 已经有一个名为 airflow.operators 的包,并且在导入 from airflow.operators 时,它会查找那里。

不要使用相对导入

永远不要使用在 Python 3 中添加的相对导入(以 . 开头)。

my_dag1.py 中执行类似操作很诱人

from .base_dag import BaseDag  # NEVER DO THAT!!!!

您应该使用完整路径(从添加到 PYTHONPATH 的目录开始)导入此类共享 DAG

from my_company.my_custom_dags.base_dag import BaseDag  # This is cool

相对导入是违反直觉的,并且根据您启动 python 代码的方式,它们的行为可能会有所不同。在 Airflow 中,同一个 DAG 文件可能会在不同的上下文中进行解析(由调度程序、工作程序或在测试期间),在这些情况下,相对导入的行为可能会有所不同。在 Airflow DAG 中导入任何内容时,请始终使用完整的 python 包路径,这将为您节省很多麻烦。您可以在 此 Stack Overflow 线程中阅读有关相对导入注意事项的更多信息。

在包文件夹中添加 __init__.py

当您创建文件夹时,您应该在文件夹中添加 __init__.py 文件作为空文件。虽然在 Python 3 中有一个隐式命名空间的概念,您不必将这些文件添加到文件夹中,但 Airflow 希望这些文件添加到您添加的所有包中。

检查您的 PYTHONPATH 加载配置

您还可以使用 airflow info 命令查看确切的路径,并像使用环境变量 PYTHONPATH 指定的目录一样使用它们。此命令指定的 sys.path 变量的内容示例如下

Python PATH: [/home/rootcss/venvs/airflow/bin:/usr/lib/python38.zip:/usr/lib/python3.8:/usr/lib/python3.8/lib-dynload:/home/rootcss/venvs/airflow/lib/python3.8/site-packages:/home/rootcss/airflow/dags:/home/rootcss/airflow/config:/home/rootcss/airflow/plugins]

以下是 airflow info 命令的示例输出

Apache Airflow: 2.0.0b3

System info
OS              | Linux
architecture    | x86_64
uname           | uname_result(system='Linux', node='85cd7ab7018e', release='4.19.76-linuxkit', version='#1 SMP Tue May 26 11:42:35 UTC 2020', machine='x86_64', processor='')
locale          | ('en_US', 'UTF-8')
python_version  | 3.8.6 (default, Nov 25 2020, 02:47:44)  [GCC 8.3.0]
python_location | /usr/local/bin/python

Tools info
git             | git version 2.20.1
ssh             | OpenSSH_7.9p1 Debian-10+deb10u2, OpenSSL 1.1.1d  10 Sep 2019
kubectl         | NOT AVAILABLE
gcloud          | NOT AVAILABLE
cloud_sql_proxy | NOT AVAILABLE
mysql           | mysql  Ver 8.0.22 for Linux on x86_64 (MySQL Community Server - GPL)
sqlite3         | 3.27.2 2019-02-25 16:06:06 bd49a8271d650fa89e446b42e513b595a717b9212c91dd384aab871fc1d0alt1
psql            | psql (PostgreSQL) 11.9 (Debian 11.9-0+deb10u1)

Paths info
airflow_home    | /root/airflow
system_path     | /usr/local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
python_path     | /usr/local/bin:/opt/airflow:/files/plugins:/usr/local/lib/python38.zip:/usr/local/lib/python3.8:/usr/
                | local/lib/python3.8/lib-dynload:/usr/local/lib/python3.8/site-packages:/files/dags:/root/airflow/conf
                | ig:/root/airflow/plugins
airflow_on_path | True

Config info
executor             | LocalExecutor
task_logging_handler | airflow.utils.log.file_task_handler.FileTaskHandler
sql_alchemy_conn     | postgresql+psycopg2://postgres:airflow@postgres/airflow
dags_folder          | /files/dags
plugins_folder       | /root/airflow/plugins
base_log_folder      | /root/airflow/logs

Providers info
apache-airflow-providers-amazon           | 1.0.0b2
apache-airflow-providers-apache-cassandra | 1.0.0b2
apache-airflow-providers-apache-druid     | 1.0.0b2
apache-airflow-providers-apache-hdfs      | 1.0.0b2
apache-airflow-providers-apache-hive      | 1.0.0b2

将目录添加到 PYTHONPATH

您可以使用环境变量 PYTHONPATH 指定要添加到 sys.path 的其他目录。使用以下命令提供项目根目录的路径来启动 python shell

PYTHONPATH=/home/arch/projects/airflow_operators python

sys.path 变量将如下所示

>>> import sys
>>> from pprint import pprint
>>> pprint(sys.path)
['',
 '/home/arch/projects/airflow_operators'
 '/home/arch/.pyenv/versions/3.8.4/lib/python37.zip',
 '/home/arch/.pyenv/versions/3.8.4/lib/python3.8',
 '/home/arch/.pyenv/versions/3.8.4/lib/python3.8/lib-dynload',
 '/home/arch/venvs/airflow/lib/python3.8/site-packages']

正如我们所见,我们提供的目录现在已添加到路径中,让我们尝试导入该包

>>> import airflow_operators
Hello from airflow_operators
>>>

我们还可以将 PYTHONPATH 变量与 airflow 命令一起使用。例如,如果我们运行以下 airflow 命令

PYTHONPATH=/home/arch/projects/airflow_operators airflow info

我们将看到 Python PATH 使用我们提到的 PYTHONPATH 值进行更新,如下所示

Python PATH: [/home/arch/venv/bin:/home/arch/projects/airflow_operators:/usr/lib/python38.zip:/usr/lib/python3.8:/usr/lib/python3.8/lib-dynload:/home/arch/venv/lib/python3.8/site-packages:/home/arch/airflow/dags:/home/arch/airflow/config:/home/arch/airflow/plugins]

在 Python 中创建包

这是添加自定义代码的最有组织的方式。得益于使用包,您可以组织您的版本控制方法,控制安装了哪些版本的共享代码,并以受控的方式将代码部署到所有实例和容器中——所有这些都由系统管理员/DevOps 而不是 DAG 编写者完成。当您有一个单独的团队来管理此共享代码时,它通常是合适的,但是如果您了解 python 的方法,您也可以在较小的部署中以这种方式分发您的代码。您还可以将您的 插件提供程序包 作为 python 包安装,因此学习如何构建您的包很方便。

以下是如何创建您的包

1. 在开始之前,选择并安装您将使用的构建/打包工具,理想情况下,它应该符合 PEP-621 标准,以便能够轻松切换到不同的工具。流行的选择是 setuptools、poetry、hatch、flit。

  1. 确定您何时创建自己的包。创建包目录——在本例中,我们将其命名为 airflow_operators

mkdir airflow_operators
  1. 在包内部创建文件 __init__.py 并添加以下代码

print("Hello from airflow_operators")

当我们导入这个包时,它应该打印以上消息。

4. 创建 pyproject.toml 并使用您选择的构建工具配置填充它。请参阅 pyproject.toml 规范

  1. 使用您选择的工具构建您的项目。例如,对于 hatch,它可以是

hatch build -t wheel

这将在您的 dist 文件夹中创建 .whl 文件

  1. 使用 pip 安装 .whl 文件

pip install dist/airflow_operators-0.0.0-py3-none-any.whl
  1. 该包现在可以使用了!

>>> import airflow_operators
Hello from airflow_operators
>>>

可以使用 pip 命令删除该包

pip uninstall airflow_operators

有关如何创建和发布 python 包的更多详细信息,请参阅 打包 Python 项目

此条目是否有帮助?