Airflow 峰会 2025 将于 10 月 07-09 日举行。立即注册获取早鸟票!

DAG 捆绑包

DAG 捆绑包是包含一个或多个 DAG、文件及其相关文件(例如其他 Python 脚本、配置文件或其他资源)的集合。DAG 捆绑包可以从各种位置获取 DAG,例如本地目录、Git 仓库或其他外部系统。部署管理员也可以编写自己的 DAG 捆绑包类来支持自定义来源。您还可以在一个 Airflow 部署中定义多个 DAG 捆绑包,以便更好地组织您的 DAG。通过将捆绑包保持在更高级别,它允许对 DAG 运行所需的一切进行版本控制。

这与 Airflow 2 或更早版本中的 dags folder 相似,但功能更强大。在早期版本中,DAG 必须位于本地磁盘上的一个位置,并且将 DAG 获取到该位置完全是部署管理员的责任。

由于 DAG 捆绑包支持版本控制,它们还允许 Airflow 使用特定版本的 DAG 捆绑包运行任务,从而使 DAG 运行在整个执行过程中使用相同的代码,即使 DAG 在运行中途被更新。

为何 DAG 捆绑包很重要?

  • 版本控制: 通过支持版本控制,DAG 捆绑包允许 DAG 运行在整个执行过程中使用相同的代码,即使 DAG 在运行中途被更新。

  • 可伸缩性: 通过 DAG 捆绑包,Airflow 可以将大量 DAG 组织成逻辑单元,从而高效地管理它们。

  • 灵活性: DAG 捆绑包能够与外部系统(例如 Git 仓库)无缝集成,以获取 DAG。

DAG 捆绑包的类型

Airflow 支持多种类型的 DAG 捆绑包,每种类型都适用于特定的用例

airflow.dag_processing.bundles.local.LocalDagBundle

这些捆绑包引用包含 DAG 文件的本地目录。它们非常适合开发和测试环境,但不支持捆绑包的版本控制,这意味着任务始终使用最新代码运行。

airflow.providers.git.bundles.git.GitDagBundle

这些捆绑包与 Git 仓库集成,允许 Airflow 直接从仓库中获取 DAG。

配置 DAG 捆绑包

DAG 捆绑包在 dag_bundle_config_list 中配置。您可以在此处添加一个或多个 DAG 捆绑包。

默认情况下,Airflow 会添加一个本地 DAG 捆绑包,这与旧的 dags folder 相同。这样做是为了向后兼容,如果您不想使用它,可以将其移除。您也可以保留它并添加其他 DAG 捆绑包,例如 git dag bundle。

例如,在您的 airflow.cfg 文件中添加多个 DAG 捆绑包

[dag_processor]
dag_bundle_config_list = [
    {
      "name": "my_git_repo",
      "classpath": "airflow.dag_processing.bundles.git.GitDagBundle",
      "kwargs": {"tracking_ref": "main", "git_conn_id": "my_git_conn"}
    },
    {
      "name": "dags-folder",
      "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle",
      "kwargs": {}
    }
  ]

注意

空格,特别是在最后一行上的空格,很重要,这样多行值才能正常工作。更多详细信息可以在 configparser 文档中找到。

您还可以通过在 kwargs 中传递参数来覆盖每个 DAG 捆绑包的 refresh_interval。这控制了 DAG 处理器刷新或查找 DAG 捆绑包中新文件的频率。

编写自定义 DAG 捆绑包

当您通过扩展 BaseDagBundle 类来实现您自己的 DAG 捆绑包时,有几个方法是必须实现的。以下是帮助您实现自定义 DAG 捆绑包的指南。

抽象方法

以下方法是抽象方法,必须在您的自定义捆绑包类中实现

path

此属性应返回一个 Path 对象,指向存储此捆绑包的 DAG 文件的目录。Airflow 使用此属性来定位要处理的 DAG 文件。

get_current_version

此方法应以字符串形式返回捆绑包的当前版本。Airflow 稍后会将此版本传递给 __init__ 方法,以便在运行任务时再次获取此版本的捆绑包。如果不支持版本控制,则应返回 None

refresh

此方法应处理从其来源刷新捆绑包内容(例如,从远程仓库拉取最新更改)。DAG 处理器会定期使用此方法,以确保捆绑包是最新的。

可选方法

除了抽象方法之外,您可以选择覆盖以下方法来自定义捆绑包的行为

__init__

可以扩展此方法以使用额外参数初始化捆绑包,例如 GitDagBundletracking_ref。它还应调用父类的 __init__ 方法以确保正确初始化。在此方法中应避免执行耗时操作,例如网络调用,以防止捆绑包实例化期间出现延迟;耗时操作应改在 initialize 方法中执行。

initialize

此方法在 DAG 处理器或 worker 首次使用捆绑包之前调用。它允许您仅在访问捆绑包内容时执行耗时操作。

view_url

此方法应以字符串形式返回一个 URL,用于在外部系统(例如 Git 仓库的 Web 界面)上查看捆绑包。

其他注意事项

  • 版本控制: 如果您的捆绑包支持版本控制,请确保实现了 initializeget_current_versionrefresh 方法来处理特定版本的逻辑。

  • 并发性: worker 可能同时创建多个捆绑包,并且不会对捆绑包对象的调用进行序列化。因此,如果底层技术存在问题,捆绑包类必须处理锁定。例如,如果您正在克隆 git 仓库,捆绑包类负责锁定,以确保每次只有一个捆绑包对象进行克隆。基类中有一个 lock 方法,如有必要可用于此目的。

此条目有帮助吗?