Dag 包

Dag 包是一个或多个 DAG、文件以及它们关联文件的集合,例如其他 Python 脚本、配置文件或其他资源。Dag 包可以从各种位置获取 DAG,例如本地目录、Git 仓库或其他外部系统。部署管理员也可以编写自己的 Dag 包类以支持自定义来源。您还可以在 Airflow 部署中定义多个 Dag 包,以便更好地组织您的 DAG。将包保持在更高的层级,可对 Dag 运行所需的所有内容进行版本控制。

这类似于 Airflow 2 或更早版本中的 Dags 文件夹,但功能更强大。之前 Dags 必须放在本地磁盘的同一位置,且将 Dags 放置在那里完全由部署管理员负责。

由于 Dag 包支持版本控制,Airflow 还能使用特定版本的 Dag 包来运行任务,这样即使在运行过程中 Dag 被更新,整个 Dag 运行仍可使用相同的代码。

为什么 Dag 包很重要?

  • 版本控制:通过支持版本控制,Dag 包使得 Dag 运行在整个执行期间使用相同的代码,即使在运行途中 Dag 被更新。

  • 可扩展性:使用 Dag 包,Airflow 能够通过将大量 DAG 组织为逻辑单元,高效管理大量 DAG。

  • 灵活性:Dag 包能够无缝集成外部系统,例如 Git 仓库,以获取 DAG。

Dag 包的类型

Airflow 支持多种类型的 Dag 包,每种都满足特定的使用场景

airflow.dag_processing.bundles.local.LocalDagBundle

这些包引用包含 Dag 文件的本地目录。它们非常适合开发和测试环境,但不支持包的版本控制,这意味着任务总是使用最新代码运行。

airflow.providers.git.bundles.git.GitDagBundle

这些包与 Git 仓库集成,允许 Airflow 直接从仓库获取 Dag。

airflow.providers.amazon.aws.bundles.s3.S3DagBundle

这些包引用包含 Dag 文件的 S3 存储桶。它们不支持包的版本控制,这意味着任务总是使用最新代码运行。

airflow.providers.google.cloud.bundles.gcs.GCSDagBundle

这些包引用包含 Dag 文件的 GCS 存储桶。它们不支持包的版本控制,这意味着任务总是使用最新代码运行。

配置 Dag 包

Dag 包在 dag_bundle_config_list 中配置。您可以在此添加一个或多个 Dag 包。

默认情况下,Airflow 会添加一个本地 Dag 包,它相当于旧的 Dags 文件夹。这样做是为了向后兼容,如果您不想使用可以将其移除。您也可以保留它并添加其他 Dag 包,例如 git Dag 包。

例如,将多个 Dag 包添加到您的 airflow.cfg 文件中

[dag_processor]
dag_bundle_config_list = [
    {
      "name": "my_git_repo",
      "classpath": "airflow.providers.git.bundles.git.GitDagBundle",
      "kwargs": {"tracking_ref": "main", "git_conn_id": "my_git_conn"}
    },
    {
      "name": "dags-folder",
      "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle",
      "kwargs": {}
    }
  ]

注意

空白字符,尤其是最后一行的空格,非常重要,以确保多行值正常工作。更多细节可参见 configparser 文档

如果您希望视图 URL 与 Dag 包默认提供的不同,可以在 Dag 包配置的 kwargs 中更改 URL。例如,若想为 git Dag 包使用自定义 URL

[dag_processor]
dag_bundle_config_list = [
    {
      "name": "my_git_repo",
      "classpath": "airflow.providers.git.bundles.git.GitDagBundle",
      "kwargs": {
        "tracking_ref": "main",
        "git_conn_id": "my_git_conn",
        "view_url_template": "https://my.custom.git.repo/view/{subdir}",
      }
    }
  ]

上面,view_url_template 被设置为自定义 URL,系统将使用该 URL 来查看 my_git_repo 包中的 DAG。{subdir} 占位符将被替换为包的 subdir 属性。占位符是包的属性,不能使用包属性之外的占位符。指定自定义 URL 时,会覆盖 Dag 包提供的默认 URL。

URL 会进行安全性验证,如果不安全,则该包的视图 URL 将被设为 None。此举是为了防止不安全 URL 带来的潜在安全问题。

您还可以通过在 kwargs 中传入 refresh_interval 来覆盖每个 Dag 包的刷新间隔。这决定了 Dag 处理器在 Dag 包中刷新或查找新文件的频率。

从 Airflow 3.0.2 起,基础镜像已预装 Git。但如果使用 3.0.2 之前的版本,需要在您的 Docker 镜像中自行安装 Git。

RUN apt-get update && apt-get install -y git
ENV GIT_PYTHON_GIT_EXECUTABLE=/usr/bin/git
ENV GIT_PYTHON_REFRESH=quiet

在用户模拟下使用 DAG 包

在使用 run_as_user(用户模拟)与 DAG 包时,确保正确配置文件权限,以便被模拟的用户能够访问由主 Airflow 进程创建的包文件。

  1. 所有被模拟的用户以及 Airflow 用户应属于同一组

  2. 配置合适的 umask 设置(例如 umask 0002

注意

这种基于权限的做法是临时方案。Airflow 的未来版本将通过基于 supervisor 的包操作来处理多用户访问,消除对共享组权限的需求。

编写自定义 Dag 包

在通过扩展 BaseDagBundle 类实现自定义 Dag 包时,需要实现若干方法。以下指南帮助您实现自定义 Dag 包。

抽象方法

以下方法为抽象方法,必须在您的自定义包类中实现

路径

此属性应返回指向存放该包 Dag 文件的目录的 Path。Airflow 使用此属性定位待处理的 Dag 文件。

get_current_version

该方法应返回包的当前版本字符串。Airflow 稍后会将此版本传递给 __init__,以在运行任务时再次获取该版本的包。如果不支持版本控制,应返回 None

refresh

该方法应处理从来源刷新包内容(例如,从远程仓库拉取最新更改)。Dag 处理器会定期调用此方法,以确保包是最新的。

可选方法

除了抽象方法之外,您还可以覆盖以下方法来自定义包的行为

__init__

此方法可扩展,以使用额外参数初始化包,例如为 GitDagBundletracking_ref。它还应调用父类的 __init__ 方法以确保正确初始化。昂贵的操作(如网络调用)应避免在此方法中执行,以免在实例化包时产生延迟,应改在 initialize 方法中完成。

initialize

该方法在包首次在 Dag 处理器或工作节点使用前被调用。它允许您仅在访问包内容时执行耗时的操作。

view_url

该方法应返回一个字符串形式的 URL,用于在外部系统(例如 Git 仓库的网页界面)上查看该包。

其他注意事项

  • 版本控制:如果您的包支持版本控制,请确保已实现 initializeget_current_versionrefresh,以处理特定版本的逻辑。

  • 并发性:工作节点可能会同时创建多个包,并且不会对对包对象的调用进行序列化。因此,如果底层技术对并发有问题,包类必须实现锁机制。例如,在克隆 Git 仓库时,需要确保同一时间只有一个包对象在克隆。基类中提供了一个 lock 方法,可在必要时使用。

  • Triggerer 限制:DAG 包不会在 triggerer 组件中初始化。实际上,这意味着 trigger 不能来源于 DAG 包,因为 triggerer 不处理随时间变化的触发代码,一切都在主进程中完成。触发器可以从 sys.path 中的其他位置加载。如果需要使用自定义触发器,请确保它们位于 Python 环境的 sys.path 中,而不是从 DAG 包中获取。

此条目是否有帮助?