DAG 捆绑包¶
DAG 捆绑包是包含一个或多个 DAG、文件及其相关文件(例如其他 Python 脚本、配置文件或其他资源)的集合。DAG 捆绑包可以从各种位置获取 DAG,例如本地目录、Git 仓库或其他外部系统。部署管理员也可以编写自己的 DAG 捆绑包类来支持自定义来源。您还可以在一个 Airflow 部署中定义多个 DAG 捆绑包,以便更好地组织您的 DAG。通过将捆绑包保持在更高级别,它允许对 DAG 运行所需的一切进行版本控制。
这与 Airflow 2 或更早版本中的 dags folder 相似,但功能更强大。在早期版本中,DAG 必须位于本地磁盘上的一个位置,并且将 DAG 获取到该位置完全是部署管理员的责任。
由于 DAG 捆绑包支持版本控制,它们还允许 Airflow 使用特定版本的 DAG 捆绑包运行任务,从而使 DAG 运行在整个执行过程中使用相同的代码,即使 DAG 在运行中途被更新。
为何 DAG 捆绑包很重要?¶
版本控制: 通过支持版本控制,DAG 捆绑包允许 DAG 运行在整个执行过程中使用相同的代码,即使 DAG 在运行中途被更新。
可伸缩性: 通过 DAG 捆绑包,Airflow 可以将大量 DAG 组织成逻辑单元,从而高效地管理它们。
灵活性: DAG 捆绑包能够与外部系统(例如 Git 仓库)无缝集成,以获取 DAG。
DAG 捆绑包的类型¶
Airflow 支持多种类型的 DAG 捆绑包,每种类型都适用于特定的用例
- airflow.dag_processing.bundles.local.LocalDagBundle
这些捆绑包引用包含 DAG 文件的本地目录。它们非常适合开发和测试环境,但不支持捆绑包的版本控制,这意味着任务始终使用最新代码运行。
- airflow.providers.git.bundles.git.GitDagBundle
这些捆绑包与 Git 仓库集成,允许 Airflow 直接从仓库中获取 DAG。
配置 DAG 捆绑包¶
DAG 捆绑包在 dag_bundle_config_list 中配置。您可以在此处添加一个或多个 DAG 捆绑包。
默认情况下,Airflow 会添加一个本地 DAG 捆绑包,这与旧的 dags folder 相同。这样做是为了向后兼容,如果您不想使用它,可以将其移除。您也可以保留它并添加其他 DAG 捆绑包,例如 git dag bundle。
例如,在您的 airflow.cfg
文件中添加多个 DAG 捆绑包
[dag_processor]
dag_bundle_config_list = [
{
"name": "my_git_repo",
"classpath": "airflow.dag_processing.bundles.git.GitDagBundle",
"kwargs": {"tracking_ref": "main", "git_conn_id": "my_git_conn"}
},
{
"name": "dags-folder",
"classpath": "airflow.dag_processing.bundles.local.LocalDagBundle",
"kwargs": {}
}
]
注意
空格,特别是在最后一行上的空格,很重要,这样多行值才能正常工作。更多详细信息可以在 configparser 文档中找到。
您还可以通过在 kwargs 中传递参数来覆盖每个 DAG 捆绑包的 refresh_interval。这控制了 DAG 处理器刷新或查找 DAG 捆绑包中新文件的频率。
编写自定义 DAG 捆绑包¶
当您通过扩展 BaseDagBundle
类来实现您自己的 DAG 捆绑包时,有几个方法是必须实现的。以下是帮助您实现自定义 DAG 捆绑包的指南。
抽象方法¶
以下方法是抽象方法,必须在您的自定义捆绑包类中实现
- path
此属性应返回一个
Path
对象,指向存储此捆绑包的 DAG 文件的目录。Airflow 使用此属性来定位要处理的 DAG 文件。- get_current_version
此方法应以字符串形式返回捆绑包的当前版本。Airflow 稍后会将此版本传递给
__init__
方法,以便在运行任务时再次获取此版本的捆绑包。如果不支持版本控制,则应返回None
。- refresh
此方法应处理从其来源刷新捆绑包内容(例如,从远程仓库拉取最新更改)。DAG 处理器会定期使用此方法,以确保捆绑包是最新的。
可选方法¶
除了抽象方法之外,您可以选择覆盖以下方法来自定义捆绑包的行为
- __init__
可以扩展此方法以使用额外参数初始化捆绑包,例如
GitDagBundle
的tracking_ref
。它还应调用父类的__init__
方法以确保正确初始化。在此方法中应避免执行耗时操作,例如网络调用,以防止捆绑包实例化期间出现延迟;耗时操作应改在initialize
方法中执行。- initialize
此方法在 DAG 处理器或 worker 首次使用捆绑包之前调用。它允许您仅在访问捆绑包内容时执行耗时操作。
- view_url
此方法应以字符串形式返回一个 URL,用于在外部系统(例如 Git 仓库的 Web 界面)上查看捆绑包。
其他注意事项¶
版本控制: 如果您的捆绑包支持版本控制,请确保实现了
initialize
、get_current_version
和refresh
方法来处理特定版本的逻辑。并发性: worker 可能同时创建多个捆绑包,并且不会对捆绑包对象的调用进行序列化。因此,如果底层技术存在问题,捆绑包类必须处理锁定。例如,如果您正在克隆 git 仓库,捆绑包类负责锁定,以确保每次只有一个捆绑包对象进行克隆。基类中有一个
lock
方法,如有必要可用于此目的。