Dag 包
Dag 包是一个或多个 DAG、文件以及它们关联文件的集合,例如其他 Python 脚本、配置文件或其他资源。Dag 包可以从各种位置获取 DAG,例如本地目录、Git 仓库或其他外部系统。部署管理员也可以编写自己的 Dag 包类以支持自定义来源。您还可以在 Airflow 部署中定义多个 Dag 包,以便更好地组织您的 DAG。将包保持在更高的层级,可对 Dag 运行所需的所有内容进行版本控制。
这类似于 Airflow 2 或更早版本中的 Dags 文件夹,但功能更强大。之前 Dags 必须放在本地磁盘的同一位置,且将 Dags 放置在那里完全由部署管理员负责。
由于 Dag 包支持版本控制,Airflow 还能使用特定版本的 Dag 包来运行任务,这样即使在运行过程中 Dag 被更新,整个 Dag 运行仍可使用相同的代码。
为什么 Dag 包很重要?
版本控制:通过支持版本控制,Dag 包使得 Dag 运行在整个执行期间使用相同的代码,即使在运行途中 Dag 被更新。
可扩展性:使用 Dag 包,Airflow 能够通过将大量 DAG 组织为逻辑单元,高效管理大量 DAG。
灵活性:Dag 包能够无缝集成外部系统,例如 Git 仓库,以获取 DAG。
Dag 包的类型
Airflow 支持多种类型的 Dag 包,每种都满足特定的使用场景
- airflow.dag_processing.bundles.local.LocalDagBundle
这些包引用包含 Dag 文件的本地目录。它们非常适合开发和测试环境,但不支持包的版本控制,这意味着任务总是使用最新代码运行。
- airflow.providers.git.bundles.git.GitDagBundle
这些包与 Git 仓库集成,允许 Airflow 直接从仓库获取 Dag。
- airflow.providers.amazon.aws.bundles.s3.S3DagBundle
这些包引用包含 Dag 文件的 S3 存储桶。它们不支持包的版本控制,这意味着任务总是使用最新代码运行。
- airflow.providers.google.cloud.bundles.gcs.GCSDagBundle
这些包引用包含 Dag 文件的 GCS 存储桶。它们不支持包的版本控制,这意味着任务总是使用最新代码运行。
配置 Dag 包
Dag 包在 dag_bundle_config_list 中配置。您可以在此添加一个或多个 Dag 包。
默认情况下,Airflow 会添加一个本地 Dag 包,它相当于旧的 Dags 文件夹。这样做是为了向后兼容,如果您不想使用可以将其移除。您也可以保留它并添加其他 Dag 包,例如 git Dag 包。
例如,将多个 Dag 包添加到您的 airflow.cfg 文件中
[dag_processor]
dag_bundle_config_list = [
{
"name": "my_git_repo",
"classpath": "airflow.providers.git.bundles.git.GitDagBundle",
"kwargs": {"tracking_ref": "main", "git_conn_id": "my_git_conn"}
},
{
"name": "dags-folder",
"classpath": "airflow.dag_processing.bundles.local.LocalDagBundle",
"kwargs": {}
}
]
注意
空白字符,尤其是最后一行的空格,非常重要,以确保多行值正常工作。更多细节可参见 configparser 文档。
如果您希望视图 URL 与 Dag 包默认提供的不同,可以在 Dag 包配置的 kwargs 中更改 URL。例如,若想为 git Dag 包使用自定义 URL
[dag_processor]
dag_bundle_config_list = [
{
"name": "my_git_repo",
"classpath": "airflow.providers.git.bundles.git.GitDagBundle",
"kwargs": {
"tracking_ref": "main",
"git_conn_id": "my_git_conn",
"view_url_template": "https://my.custom.git.repo/view/{subdir}",
}
}
]
上面,view_url_template 被设置为自定义 URL,系统将使用该 URL 来查看 my_git_repo 包中的 DAG。{subdir} 占位符将被替换为包的 subdir 属性。占位符是包的属性,不能使用包属性之外的占位符。指定自定义 URL 时,会覆盖 Dag 包提供的默认 URL。
URL 会进行安全性验证,如果不安全,则该包的视图 URL 将被设为 None。此举是为了防止不安全 URL 带来的潜在安全问题。
您还可以通过在 kwargs 中传入 refresh_interval 来覆盖每个 Dag 包的刷新间隔。这决定了 Dag 处理器在 Dag 包中刷新或查找新文件的频率。
从 Airflow 3.0.2 起,基础镜像已预装 Git。但如果使用 3.0.2 之前的版本,需要在您的 Docker 镜像中自行安装 Git。
RUN apt-get update && apt-get install -y git
ENV GIT_PYTHON_GIT_EXECUTABLE=/usr/bin/git
ENV GIT_PYTHON_REFRESH=quiet
在用户模拟下使用 DAG 包
在使用 run_as_user(用户模拟)与 DAG 包时,确保正确配置文件权限,以便被模拟的用户能够访问由主 Airflow 进程创建的包文件。
所有被模拟的用户以及 Airflow 用户应属于同一组
配置合适的 umask 设置(例如
umask 0002)
注意
这种基于权限的做法是临时方案。Airflow 的未来版本将通过基于 supervisor 的包操作来处理多用户访问,消除对共享组权限的需求。
编写自定义 Dag 包
在通过扩展 BaseDagBundle 类实现自定义 Dag 包时,需要实现若干方法。以下指南帮助您实现自定义 Dag 包。
抽象方法
以下方法为抽象方法,必须在您的自定义包类中实现
- 路径
此属性应返回指向存放该包 Dag 文件的目录的
Path。Airflow 使用此属性定位待处理的 Dag 文件。- get_current_version
该方法应返回包的当前版本字符串。Airflow 稍后会将此版本传递给
__init__,以在运行任务时再次获取该版本的包。如果不支持版本控制,应返回None。- refresh
该方法应处理从来源刷新包内容(例如,从远程仓库拉取最新更改)。Dag 处理器会定期调用此方法,以确保包是最新的。
可选方法
除了抽象方法之外,您还可以覆盖以下方法来自定义包的行为
- __init__
此方法可扩展,以使用额外参数初始化包,例如为
GitDagBundle的tracking_ref。它还应调用父类的__init__方法以确保正确初始化。昂贵的操作(如网络调用)应避免在此方法中执行,以免在实例化包时产生延迟,应改在initialize方法中完成。- initialize
该方法在包首次在 Dag 处理器或工作节点使用前被调用。它允许您仅在访问包内容时执行耗时的操作。
- view_url
该方法应返回一个字符串形式的 URL,用于在外部系统(例如 Git 仓库的网页界面)上查看该包。
其他注意事项
版本控制:如果您的包支持版本控制,请确保已实现
initialize、get_current_version和refresh,以处理特定版本的逻辑。并发性:工作节点可能会同时创建多个包,并且不会对对包对象的调用进行序列化。因此,如果底层技术对并发有问题,包类必须实现锁机制。例如,在克隆 Git 仓库时,需要确保同一时间只有一个包对象在克隆。基类中提供了一个
lock方法,可在必要时使用。Triggerer 限制:DAG 包不会在 triggerer 组件中初始化。实际上,这意味着 trigger 不能来源于 DAG 包,因为 triggerer 不处理随时间变化的触发代码,一切都在主进程中完成。触发器可以从
sys.path中的其他位置加载。如果需要使用自定义触发器,请确保它们位于 Python 环境的sys.path中,而不是从 DAG 包中获取。