DAGs¶
DAG(有向无环图)是 Airflow 的核心概念,它将 任务 收集在一起,并按依赖关系和关联关系组织,以说明它们应该如何运行。
这是一个基本的 DAG 示例
它定义了四个任务 - A、B、C 和 D - 并规定了它们必须运行的顺序,以及哪些任务依赖于其他任务。它还将说明 DAG 的运行频率 - 也许是“从明天开始每 5 分钟一次”,或“自 2020 年 1 月 1 日以来每天一次”。
DAG 本身并不关心任务内部发生什么;它只关心如何执行它们 - 它们的运行顺序、重试次数、是否有超时等等。
声明 DAG¶
有三种声明 DAG 的方法 - 你可以使用 with
语句(上下文管理器),它将隐式地将内部的任何内容添加到 DAG 中
import datetime
from airflow import DAG
from airflow.operators.empty import EmptyOperator
with DAG(
dag_id="my_dag_name",
start_date=datetime.datetime(2021, 1, 1),
schedule="@daily",
):
EmptyOperator(task_id="task")
或者,你可以使用标准构造函数,将 DAG 传递给使用的任何操作符
import datetime
from airflow import DAG
from airflow.operators.empty import EmptyOperator
my_dag = DAG(
dag_id="my_dag_name",
start_date=datetime.datetime(2021, 1, 1),
schedule="@daily",
)
EmptyOperator(task_id="task", dag=my_dag)
或者,你可以使用 @dag
装饰器来 将函数转换为 DAG 生成器
import datetime
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator
@dag(start_date=datetime.datetime(2021, 1, 1), schedule="@daily")
def generate_dag():
EmptyOperator(task_id="task")
generate_dag()
没有要运行的 任务,DAG 就毫无意义,这些任务通常以 操作符、传感器 或 TaskFlow 的形式出现。
任务依赖¶
任务/操作符通常不会单独存在;它依赖于其他任务(它的上游),并且其他任务依赖于它(它的下游)。声明任务之间的这些依赖关系构成了 DAG 结构(有向无环图的边)。
有两种主要方法来声明单个任务依赖关系。推荐的方法是使用 >>
和 <<
操作符
first_task >> [second_task, third_task]
third_task << fourth_task
或者,你也可以使用更明确的 set_upstream
和 set_downstream
方法
first_task.set_downstream([second_task, third_task])
third_task.set_upstream(fourth_task)
还有一些快捷方式来声明更复杂的依赖关系。如果要使任务列表依赖于另一个任务列表,则不能使用上述任何一种方法,因此需要使用 cross_downstream
from airflow.models.baseoperator import cross_downstream
# Replaces
# [op1, op2] >> op3
# [op1, op2] >> op4
cross_downstream([op1, op2], [op3, op4])
如果要链接依赖关系,可以使用 chain
from airflow.models.baseoperator import chain
# Replaces op1 >> op2 >> op3 >> op4
chain(op1, op2, op3, op4)
# You can also do it dynamically
chain(*[EmptyOperator(task_id='op' + i) for i in range(1, 6)])
对于相同大小的列表,Chain 也可以执行成对依赖关系(这与 cross_downstream
创建的交叉依赖关系不同!)
from airflow.models.baseoperator import chain
# Replaces
# op1 >> op2 >> op4 >> op6
# op1 >> op3 >> op5 >> op6
chain(op1, [op2, op3], [op4, op5], op6)
加载 DAG¶
Airflow 从 Python 源文件加载 DAG,它会在配置的 DAG_FOLDER
中查找这些文件。它将获取每个文件,执行它,然后从该文件中加载任何 DAG 对象。
这意味着你可以在每个 Python 文件中定义多个 DAG,甚至可以使用导入将一个非常复杂的 DAG 分散到多个 Python 文件中。
但是请注意,当 Airflow 从 Python 文件加载 DAG 时,它只会提取顶层的任何 DAG 实例对象。例如,采用以下 DAG 文件
dag_1 = DAG('this_dag_will_be_discovered')
def my_function():
dag_2 = DAG('but_this_dag_will_not')
my_function()
虽然当访问该文件时会调用两个 DAG 构造函数,但只有 dag_1
位于顶层(在 globals()
中),因此只有它被添加到 Airflow 中。dag_2
未加载。
注意
在 DAG_FOLDER
中搜索 DAG 时,Airflow 仅将包含字符串 airflow
和 dag
(不区分大小写)的 Python 文件视为一种优化。
要考虑所有 Python 文件,请禁用 DAG_DISCOVERY_SAFE_MODE
配置标志。
你还可以在 DAG_FOLDER
或其任何子文件夹中提供 .airflowignore
文件,该文件描述了加载程序要忽略的文件模式。它涵盖了它所在的目录以及其下的所有子文件夹。有关文件语法的详细信息,请参见下面的 .airflowignore。
如果 .airflowignore
不满足你的需求,并且你希望有一种更灵活的方式来控制是否需要让 Airflow 解析 Python 文件,则可以通过在配置文件中设置 might_contain_dag_callable
来插入你的可调用对象。请注意,此可调用对象将替换默认的 Airflow 启发式方法,即检查 Python 文件中是否存在字符串 airflow
和 dag
(不区分大小写)。
def might_contain_dag(file_path: str, zip_file: zipfile.ZipFile | None = None) -> bool:
# Your logic to check if there are DAGs defined in the file_path
# Return True if the file_path needs to be parsed, otherwise False
运行 DAG¶
DAG 将以两种方式之一运行
当它们被手动或通过 API 触发时
按照定义的计划,这是作为 DAG 的一部分定义的
DAG 不需要计划,但定义一个计划非常常见。你可以通过 schedule
参数来定义它,如下所示
with DAG("my_daily_dag", schedule="@daily"):
...
schedule
参数有各种有效值
with DAG("my_daily_dag", schedule="0 0 * * *"):
...
with DAG("my_one_time_dag", schedule="@once"):
...
with DAG("my_continuous_dag", schedule="@continuous"):
...
提示
有关不同类型的调度的更多信息,请参阅 编写和调度。
每次运行 DAG 时,都会创建该 DAG 的新实例,Airflow 将其称为 DAG 运行。同一个 DAG 的 DAG 运行可以并行运行,并且每个运行都有一个定义的数据间隔,用于标识任务应操作的数据的时间段。
举个例子来说明为什么这很有用,考虑编写一个处理每日实验数据集的 DAG。它已被重写,并且你希望在过去 3 个月的数据上运行它 - 没问题,因为 Airflow 可以回填 DAG 并为过去 3 个月中的每一天运行其副本,所有副本同时运行。
这些 DAG 运行都将在同一实际日期开始,但每个 DAG 运行将有一个数据间隔,涵盖该 3 个月期间的某一天,并且该数据间隔是 DAG 内的所有任务、操作符和传感器在运行时查看的内容。
与 DAG 每次运行时实例化为 DAG 运行的方式非常相似,DAG 中指定的任务也会随之实例化为 任务实例。
DAG 运行在开始时会有一个开始日期,在结束时会有一个结束日期。此期间描述了 DAG 实际“运行”的时间。除了 DAG 运行的开始和结束日期之外,还有另一个日期称为逻辑日期(正式称为执行日期),它描述了计划或触发 DAG 运行的预期时间。之所以称之为逻辑,是因为它具有抽象的性质,并且根据 DAG 运行本身的上下文而具有多种含义。
例如,如果 DAG 运行由用户手动触发,则其逻辑日期将是触发 DAG 运行的日期和时间,并且该值应等于 DAG 运行的开始日期。但是,当 DAG 被自动调度,并设置了特定的调度间隔时,逻辑日期将指示它标记数据间隔开始的时间,其中 DAG 运行的开始日期将是逻辑日期 + 调度间隔。
提示
有关 逻辑日期
的更多信息,请参阅 数据间隔 和 执行日期是什么意思?。
DAG 分配¶
请注意,每个操作符/任务都必须分配给 DAG 才能运行。Airflow 有几种计算 DAG 的方法,无需你显式传递它
如果你在
with DAG
块中声明你的操作符如果你在
@dag
装饰器中声明你的操作符如果你将你的操作符放在具有 DAG 的操作符的上游或下游
否则,你必须使用 dag=
将其传递给每个操作符。
默认参数¶
通常,DAG 中的许多 Operator 需要相同的默认参数集(例如它们的 retries
)。您不必为每个 Operator 单独指定这些参数,而是在创建 DAG 时将 default_args
传递给 DAG,它会自动将其应用于与其绑定的任何 Operator。
import pendulum
with DAG(
dag_id="my_dag",
start_date=pendulum.datetime(2016, 1, 1),
schedule="@daily",
default_args={"retries": 2},
):
op = BashOperator(task_id="hello_world", bash_command="Hello World!")
print(op.retries) # 2
DAG 装饰器¶
2.0 版本新增功能。
除了使用上下文管理器或 DAG()
构造函数声明单个 DAG 的传统方法外,您还可以使用 @dag
装饰一个函数,将其转换为 DAG 生成器函数。
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
)
def example_dag_decorator(email: str = "[email protected]"):
"""
DAG to send server IP to email.
:param email: Email to send IP to. Defaults to [email protected].
"""
get_ip = GetRequestOperator(task_id="get_ip", url="http://httpbin.org/get")
@task(multiple_outputs=True)
def prepare_email(raw_json: dict[str, Any]) -> dict[str, str]:
external_ip = raw_json["origin"]
return {
"subject": f"Server connected from {external_ip}",
"body": f"Seems like today your server executing Airflow is connected from IP {external_ip}<br>",
}
email_info = prepare_email(get_ip.output)
EmailOperator(
task_id="send_email", to=email, subject=email_info["subject"], html_content=email_info["body"]
)
example_dag = example_dag_decorator()
除了是一种简洁地创建 DAG 的新方法外,该装饰器还会将函数中的任何参数设置为 DAG 参数,允许您在触发 DAG 时设置这些参数。然后,您可以从 Python 代码或从 {{ context.params }}
访问参数,在 Jinja 模板中使用。
注意
Airflow 只会加载 DAG 文件顶层中出现的 DAG。这意味着您不能只使用 @dag
声明一个函数 - 您还必须在 DAG 文件中至少调用一次该函数,并将其分配给一个顶层对象,如上面的示例所示。
控制流¶
默认情况下,只有当其依赖的所有 Task 都成功时,DAG 才会运行 Task。但是,有几种方法可以修改此行为。
分支 - 根据条件选择要移动到的 Task
触发规则 - 设置 DAG 运行 Task 的条件
设置和拆卸 - 定义设置和拆卸关系
仅最新 - 一种特殊的分支形式,仅针对当前运行的 DAG 运行
依赖于过去 - Task 可以依赖于 *先前运行* 中的自身
分支¶
您可以使用分支来告诉 DAG *不* 运行所有依赖的 Task,而是选择一个或多个路径进行处理。这就是 @task.branch
装饰器的作用。
@task.branch
装饰器与 @task
非常相似,不同之处在于它期望被装饰的函数返回一个 Task 的 ID(或 ID 列表)。指定的 Task 将被执行,而所有其他路径将被跳过。它也可以返回 *None* 来跳过所有下游 Task。
Python 函数返回的 task_id 必须引用 @task.branch
装饰的 Task 的直接下游 Task。
注意
当 Task 既是分支 Operator 的下游,又是一个或多个选定 Task 的下游时,它将不会被跳过。
分支 Task 的路径为 branch_a
、join
和 branch_b
。由于 join
是 branch_a
的下游 Task,即使它没有作为分支决策的一部分返回,它仍然会运行。
@task.branch
也可以与 XComs 一起使用,允许分支上下文根据上游 Task 动态决定要遵循哪个分支。例如
@task.branch(task_id="branch_task")
def branch_func(ti=None):
xcom_value = int(ti.xcom_pull(task_ids="start_task"))
if xcom_value >= 5:
return "continue_task"
elif xcom_value >= 3:
return "stop_task"
else:
return None
start_op = BashOperator(
task_id="start_task",
bash_command="echo 5",
do_xcom_push=True,
dag=dag,
)
branch_op = branch_func()
continue_op = EmptyOperator(task_id="continue_task", dag=dag)
stop_op = EmptyOperator(task_id="stop_task", dag=dag)
start_op >> branch_op >> [continue_op, stop_op]
如果您希望使用分支功能实现自己的 Operator,则可以继承 BaseBranchOperator
,其行为类似于 @task.branch
装饰器,但期望您提供方法 choose_branch
的实现。
注意
建议使用 @task.branch
装饰器,而不是直接在 DAG 中实例化 BranchPythonOperator
。后者通常只应被子类化以实现自定义 Operator。
与 @task.branch
的可调用对象一样,此方法可以返回下游 Task 的 ID 或 Task ID 列表,这些 Task 将被运行,而所有其他 Task 将被跳过。它也可以返回 None 以跳过所有下游 Task。
class MyBranchOperator(BaseBranchOperator):
def choose_branch(self, context):
"""
Run an extra branch on the first day of the month
"""
if context['data_interval_start'].day == 1:
return ['daily_task_id', 'monthly_task_id']
elif context['data_interval_start'].day == 2:
return 'daily_task_id'
else:
return None
与常规 Python 代码的 @task.branch
装饰器类似,还有使用名为 @task.branch_virtualenv
的虚拟环境或名为 @task.branch_external_python
的外部 Python 的分支装饰器。
仅最新¶
Airflow 的 DAG 运行通常针对与当前日期不同的日期运行 - 例如,为上个月的每一天运行一个 DAG 副本以回填一些数据。
但是,在某些情况下,您*不*希望让 DAG 的某些(或全部)部分为之前的日期运行;在这种情况下,您可以使用 LatestOnlyOperator
。
如果您不是在“最新”的 DAG 运行中(如果现在的挂钟时间在其 execution_time 和下一个计划的 execution_time 之间,并且它不是外部触发的运行),则此特殊 Operator 会跳过其下游的所有 Task。
这是一个例子
import datetime
import pendulum
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.latest_only import LatestOnlyOperator
from airflow.utils.trigger_rule import TriggerRule
with DAG(
dag_id="latest_only_with_trigger",
schedule=datetime.timedelta(hours=4),
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example3"],
) as dag:
latest_only = LatestOnlyOperator(task_id="latest_only")
task1 = EmptyOperator(task_id="task1")
task2 = EmptyOperator(task_id="task2")
task3 = EmptyOperator(task_id="task3")
task4 = EmptyOperator(task_id="task4", trigger_rule=TriggerRule.ALL_DONE)
latest_only >> task1 >> [task3, task4]
task2 >> [task3, task4]
对于此 DAG 的情况
task1
直接是latest_only
的下游,除了最新的运行之外,所有运行都会跳过它。task2
完全独立于latest_only
,并且将在所有计划的时间段内运行。task3
是task1
和task2
的下游,并且由于默认的触发规则为all_success
,它将收到来自task1
的级联跳过。task4
是task1
和task2
的下游,但它不会被跳过,因为它的trigger_rule
设置为all_done
。
依赖于过去¶
您还可以说,只有当先前 DAG 运行中该 Task 的 *先前* 运行成功时,Task 才能运行。要使用此功能,只需将 Task 上的 depends_on_past
参数设置为 True
。
请注意,如果您在 DAG 的生命周期开始时运行 DAG - 特别是它的首次 *自动* 运行 - 则 Task 仍将运行,因为没有先前的运行可依赖。
触发规则¶
默认情况下,Airflow 将等待 Task 的所有上游(直接父级)Task 成功,然后才会运行该 Task。
但是,这只是默认行为,您可以使用 Task 的 trigger_rule
参数对其进行控制。trigger_rule
的选项如下
all_success
(默认):所有上游 Task 都已成功all_failed
:所有上游 Task 都处于failed
或upstream_failed
状态all_done
:所有上游 Task 都已完成执行all_skipped
:所有上游 Task 都处于skipped
状态one_failed
:至少有一个上游 Task 失败(不等待所有上游 Task 完成)one_success
:至少有一个上游 Task 成功(不等待所有上游 Task 完成)one_done
:至少一个上游 Task 成功或失败none_failed
:所有上游 Task 都未failed
或upstream_failed
- 也就是说,所有上游 Task 都已成功或被跳过none_failed_min_one_success
:所有上游 Task 都未failed
或upstream_failed
,并且至少有一个上游 Task 成功。none_skipped
: 没有上游任务处于skipped
状态 - 也就是说,所有上游任务都处于success
、failed
或upstream_failed
状态。always
: 完全没有依赖关系,随时运行此任务。
如果你愿意,也可以将此与 依赖过去 功能结合使用。
注意
了解触发规则和跳过任务之间的交互非常重要,特别是作为分支操作一部分跳过的任务。你几乎不希望在分支操作的下游使用 all_success 或 all_failed。
跳过的任务将通过触发规则 all_success
和 all_failed
级联,并导致它们也被跳过。考虑以下 DAG
# dags/branch_without_trigger.py
import pendulum
from airflow.decorators import task
from airflow.models import DAG
from airflow.operators.empty import EmptyOperator
dag = DAG(
dag_id="branch_without_trigger",
schedule="@once",
start_date=pendulum.datetime(2019, 2, 28, tz="UTC"),
)
run_this_first = EmptyOperator(task_id="run_this_first", dag=dag)
@task.branch(task_id="branching")
def do_branching():
return "branch_a"
branching = do_branching()
branch_a = EmptyOperator(task_id="branch_a", dag=dag)
follow_branch_a = EmptyOperator(task_id="follow_branch_a", dag=dag)
branch_false = EmptyOperator(task_id="branch_false", dag=dag)
join = EmptyOperator(task_id="join", dag=dag)
run_this_first >> branching
branching >> branch_a >> follow_branch_a >> join
branching >> branch_false >> join
join
是 follow_branch_a
和 branch_false
的下游。 join
任务将显示为已跳过,因为其 trigger_rule
默认设置为 all_success
,并且由分支操作引起的跳过级联,以跳过标记为 all_success
的任务。
通过将 join
任务中的 trigger_rule
设置为 none_failed_min_one_success
,我们可以获得预期的行为。
动态 DAG¶
由于 DAG 由 Python 代码定义,因此无需纯粹声明;您可以自由使用循环、函数等来定义 DAG。
例如,这是一个使用 for
循环定义一些任务的 DAG
with DAG("loop_example", ...):
first = EmptyOperator(task_id="first")
last = EmptyOperator(task_id="last")
options = ["branch_a", "branch_b", "branch_c", "branch_d"]
for option in options:
t = EmptyOperator(task_id=option)
first >> t >> last
一般来说,我们建议您尽量保持 DAG 任务的*拓扑结构*(布局)相对稳定;动态 DAG 通常更适合用于动态加载配置选项或更改操作符选项。
DAG 可视化¶
如果要查看 DAG 的可视化表示,您有两个选择
您可以加载 Airflow UI,导航到您的 DAG,然后选择“图表”
您可以运行
airflow dags show
,它会将其渲染为图像文件
我们通常建议您使用图表视图,因为它还将显示您选择的任何 DAG 运行中所有 任务实例 的状态。
当然,随着您开发 DAG,它们将变得越来越复杂,因此我们提供了一些修改这些 DAG 视图的方法,使其更易于理解。
任务组¶
任务组可用于在图表视图中将任务组织成层次结构组。它对于创建重复模式和减少视觉混乱很有用。
与 子 DAG 不同,任务组纯粹是 UI 分组概念。任务组中的任务位于同一个原始 DAG 上,并遵守所有 DAG 设置和池配置。
可以使用 >>
和 <<
操作符在任务组中的所有任务之间应用依赖关系。例如,以下代码将 task1
和 task2
放入任务组 group1
,然后将这两个任务都放在 task3
的上游。
from airflow.decorators import task_group
@task_group()
def group1():
task1 = EmptyOperator(task_id="task1")
task2 = EmptyOperator(task_id="task2")
task3 = EmptyOperator(task_id="task3")
group1() >> task3
任务组还支持像 DAG 一样的 default_args
,它将覆盖 DAG 级别的 default_args
。
import datetime
from airflow import DAG
from airflow.decorators import task_group
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
with DAG(
dag_id="dag1",
start_date=datetime.datetime(2016, 1, 1),
schedule="@daily",
default_args={"retries": 1},
):
@task_group(default_args={"retries": 3})
def group1():
"""This docstring will become the tooltip for the TaskGroup."""
task1 = EmptyOperator(task_id="task1")
task2 = BashOperator(task_id="task2", bash_command="echo Hello World!", retries=2)
print(task1.retries) # 3
print(task2.retries) # 2
如果要查看任务组的更高级用法,可以查看 Airflow 附带的 example_task_group_decorator.py
示例 DAG。
注意
默认情况下,子任务/任务组的 ID 会附加其父任务组的 group_id 前缀。这有助于确保整个 DAG 中 group_id 和 task_id 的唯一性。
要禁用前缀,请在创建任务组时传递 prefix_group_id=False
,但请注意,您现在需要负责确保每个任务和组都具有唯一的 ID。
注意
当使用 @task_group
修饰器时,修饰函数的文档字符串将用作 UI 中任务组的工具提示,除非显式提供 tooltip
值。
边缘标签¶
除了将任务分组到组中之外,您还可以在图表视图中标记不同任务之间的依赖关系边缘 - 这对于 DAG 的分支区域尤其有用,因此您可以标记某些分支可能运行的条件。
要添加标签,可以直接将它们与 >>
和 <<
操作符一起使用
from airflow.utils.edgemodifier import Label
my_task >> Label("When empty") >> other_task
或者,您可以将标签对象传递给 set_upstream
/set_downstream
from airflow.utils.edgemodifier import Label
my_task.set_downstream(other_task, Label("When empty"))
这是一个说明标记不同分支的示例 DAG
with DAG(
"example_branch_labels",
schedule="@daily",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
) as dag:
ingest = EmptyOperator(task_id="ingest")
analyse = EmptyOperator(task_id="analyze")
check = EmptyOperator(task_id="check_integrity")
describe = EmptyOperator(task_id="describe_integrity")
error = EmptyOperator(task_id="email_error")
save = EmptyOperator(task_id="save")
report = EmptyOperator(task_id="report")
ingest >> analyse >> check
check >> Label("No errors") >> save >> report
check >> Label("Errors found") >> describe >> error >> report
DAG 和任务文档¶
可以向 DAG 和任务对象添加文档或注释,这些文档或注释在 Web 界面中可见(DAG 的“图表”和“树”,任务的“任务实例详细信息”)。
有一组特殊的任务属性,如果定义,它们将呈现为富内容
属性 |
呈现为 |
---|---|
doc |
等宽字体 |
doc_json |
json |
doc_yaml |
yaml |
doc_md |
markdown |
doc_rst |
reStructuredText |
请注意,对于 DAG,doc_md
是唯一被解释的属性。对于 DAG,它可以包含字符串或对 markdown 文件的引用。Markdown 文件通过以 .md
结尾的字符串来识别。如果提供相对路径,则将从 Airflow 调度程序或 DAG 解析器启动的相对路径加载。如果 markdown 文件不存在,则会将传递的文件名用作文本,不会显示异常。请注意,markdown 文件在 DAG 解析期间加载,markdown 内容的更改需要一个 DAG 解析周期才能显示更改。
如果您的任务是根据配置文件动态构建的,这将特别有用,因为它允许您在 Airflow 中公开导致相关任务的配置。
"""
### My great DAG
"""
import pendulum
dag = DAG(
"my_dag",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule="@daily",
catchup=False,
)
dag.doc_md = __doc__
t = BashOperator("foo", dag=dag)
t.doc_md = """\
#Title"
Here's a [url](www.airbnb.com)
"""
子 DAG¶
注意
子 DAG 已弃用,因此任务组始终是首选。
有时,您会发现您经常将完全相同的任务集添加到每个 DAG,或者您希望将许多任务分组到一个单独的逻辑单元中。这就是子 DAG 的用途。
例如,这是一个在两个部分中具有大量并行任务的 DAG
我们可以将所有并行的 task-*
操作符组合到一个子 DAG 中,以便生成的 DAG 类似于以下内容
请注意,子 DAG 操作符应包含返回 DAG 对象的工厂方法。这将防止子 DAG 在主 UI 中被视为单独的 DAG - 请记住,如果 Airflow 在 Python 文件的顶层看到 DAG,它会将其加载为自己的 DAG。例如
import pendulum
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
def subdag(parent_dag_name, child_dag_name, args) -> DAG:
"""
Generate a DAG to be used as a subdag.
:param str parent_dag_name: Id of the parent DAG
:param str child_dag_name: Id of the child DAG
:param dict args: Default arguments to provide to the subdag
:return: DAG to use as a subdag
"""
dag_subdag = DAG(
dag_id=f"{parent_dag_name}.{child_dag_name}",
default_args=args,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
schedule="@daily",
)
for i in range(5):
EmptyOperator(
task_id=f"{child_dag_name}-task-{i + 1}",
default_args=args,
dag=dag_subdag,
)
return dag_subdag
然后,可以在主 DAG 文件中引用此子 DAG
import datetime
from airflow.example_dags.subdags.subdag import subdag
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.subdag import SubDagOperator
DAG_NAME = "example_subdag_operator"
with DAG(
dag_id=DAG_NAME,
default_args={"retries": 2},
start_date=datetime.datetime(2022, 1, 1),
schedule="@once",
tags=["example"],
) as dag:
start = EmptyOperator(
task_id="start",
)
section_1 = SubDagOperator(
task_id="section-1",
subdag=subdag(DAG_NAME, "section-1", dag.default_args),
)
some_other_task = EmptyOperator(
task_id="some-other-task",
)
section_2 = SubDagOperator(
task_id="section-2",
subdag=subdag(DAG_NAME, "section-2", dag.default_args),
)
end = EmptyOperator(
task_id="end",
)
start >> section_1 >> some_other_task >> section_2 >> end
您可以从主 DAG 的图表视图中放大 SubDagOperator
,以显示子 DAG 中包含的任务
使用子 DAG 时的一些其他提示
按照惯例,子 DAG 的
dag_id
应以其父 DAG 的名称和点号 (parent.child
) 作为前缀您应该通过将参数传递给子 DAG 操作符(如上所示)在主 DAG 和子 DAG 之间共享参数
子 DAG 必须具有计划且已启用。如果子 DAG 的计划设置为
None
或@once
,则子 DAG 将在不执行任何操作的情况下成功。清除
SubDagOperator
也会清除其中任务的状态。在
SubDagOperator
上标记成功不会影响其中任务的状态。避免在子 DAG 中的任务中使用 依赖过去,因为这可能会造成混淆。
您可以为子 DAG 指定执行器。如果您希望在进程内运行子 DAG 并有效地将其并行度限制为 1,则通常使用 SequentialExecutor。使用 LocalExecutor 可能会有问题,因为它可能会过度订阅您的工作程序,从而在单个插槽中运行多个任务。
有关演示,请参阅 airflow/example_dags
。
注意
并行性*不被* SubDagOperator
*遵守*,因此资源可能会被 SubdagOperators 消耗,超出您可能已设置的任何限制。
任务组与子 DAG¶
SubDAG(子DAG)虽然与 TaskGroups(任务组) 的用途相似,但由于其实现方式,引入了性能和功能上的问题。
SubDagOperator 启动一个 BackfillJob(回填作业),该作业会忽略现有的并行配置,可能导致工作环境资源超额分配。
SubDAG 有其自身的 DAG 属性。当 SubDAG 的 DAG 属性与其父 DAG 不一致时,可能会发生意外行为。
由于 SubDAG 作为完全独立的 DAG 存在,因此无法在一个视图中看到“完整”的 DAG。
SubDAG 引入了各种边缘情况和注意事项。这会扰乱用户体验和期望。
另一方面,TaskGroups 是一个更好的选择,因为它纯粹是一个 UI 分组概念。TaskGroup 内的所有任务的行为仍然与 TaskGroup 之外的任何其他任务相同。
你可以看到这两种构造之间的核心差异。
TaskGroup(任务组) |
SubDAG(子DAG) |
---|---|
作为同一个 DAG 的一部分重复模式 |
作为单独的 DAG 重复模式 |
DAG 的一组视图和统计信息 |
父 DAG 和子 DAG 之间分开的视图和统计信息 |
一组 DAG 配置 |
多组 DAG 配置 |
通过现有的 SchedulerJob 遵循并行配置 |
由于新产生的 BackfillJob 而不遵循并行配置 |
使用上下文管理器的简单结构声明 |
带有命名限制的复杂 DAG 工厂 |
打包 DAG¶
虽然较简单的 DAG 通常只在一个 Python 文件中,但更复杂的 DAG 可能会分布在多个文件中,并且具有应该与它们一起发布的依赖项(“供应商”)。
你可以在 DAG_FOLDER
内使用标准的文件系统布局来完成此操作,也可以将 DAG 及其所有 Python 文件打包成单个 zip 文件。例如,你可以将两个 DAG 以及它们需要的依赖项打包成一个 zip 文件,其内容如下
my_dag1.py
my_dag2.py
package1/__init__.py
package1/functions.py
请注意,打包的 DAG 有一些注意事项
如果启用了序列化的 pickling,则无法使用它们
它们不能包含已编译的库(例如
libz.so
),只能包含纯 Python它们将被插入到 Python 的
sys.path
中,并且可以被 Airflow 进程中的任何其他代码导入,因此请确保包名称与系统中已安装的其他包不冲突。
一般来说,如果你有一组复杂的已编译依赖项和模块,你最好使用 Python virtualenv
系统,并在目标系统上使用 pip
安装必要的软件包。
.airflowignore
¶
.airflowignore
文件指定 DAG_FOLDER
或 PLUGINS_FOLDER
中 Airflow 应有意忽略的目录或文件。Airflow 支持文件中模式的两种语法风格,由 DAG_IGNORE_FILE_SYNTAX
配置参数指定(在 Airflow 2.3 中添加):regexp
和 glob
。
注意
默认的 DAG_IGNORE_FILE_SYNTAX
是 regexp
,以确保向后兼容。
对于 regexp
模式语法(默认),.airflowignore
中的每一行都指定一个正则表达式模式,并且名称(不是 DAG id)与任何模式匹配的目录或文件将被忽略(在底层,使用 Pattern.search()
来匹配模式)。使用 #
字符表示注释;将忽略以 #
开头的行上的所有字符。
与 Airflow 中大多数正则表达式匹配一样,正则表达式引擎是 re2
,它明确不支持许多高级功能,请查看其 文档 以获取更多信息。
使用 glob
语法,模式的工作方式与 .gitignore
文件中的模式相同
*
字符将匹配除/
之外的任意数量的字符?
字符将匹配除/
之外的任何单个字符可以使用范围表示法(例如
[a-zA-Z]
)来匹配范围中的一个字符可以通过添加前缀
!
来否定模式。模式按顺序求值,因此否定可以覆盖同一文件中先前定义的模式或父目录中定义的模式。可以使用双星号 (
**
) 来匹配跨目录。例如,**/__pycache__/
将忽略每个子目录中的__pycache__
目录,深度不限。如果模式的开头或中间(或两者)有
/
,则该模式相对于特定的 .airflowignore 文件本身的目录级别。否则,该模式也可能匹配 .airflowignore 级别下的任何级别。
.airflowignore
文件应放在你的 DAG_FOLDER
中。例如,你可以使用 regexp
语法准备一个 .airflowignore
文件,其内容如下
project_a
tenant_[\d]
或者,等效地,使用 glob
语法
**/*project_a*
tenant_[0-9]*
然后,你的 DAG_FOLDER
中的 project_a_dag_1.py
、TESTING_project_a.py
、tenant_1.py
、project_a/dag_1.py
和 tenant_1/dag_1.py
等文件将被忽略(如果目录的名称与任何模式匹配,则 Airflow 将完全不会扫描此目录及其所有子文件夹。这提高了 DAG 查找的效率)。
.airflowignore
文件的作用域是它所在的目录及其所有子文件夹。你还可以为 DAG_FOLDER
中的子文件夹准备 .airflowignore
文件,它仅适用于该子文件夹。
DAG 依赖项¶
在 Airflow 2.1 中添加
虽然 DAG 中任务之间的依赖关系是通过上游和下游关系显式定义的,但 DAG 之间的依赖关系要复杂一些。一般来说,一个 DAG 可以通过两种方式依赖于另一个 DAG
等待 -
ExternalTaskSensor
另一个困难之处在于,一个 DAG 可以等待或触发另一个 DAG 的多个运行,且具有不同的数据间隔。**Dag 依赖项** 视图 菜单 -> 浏览 -> DAG 依赖项
有助于可视化 DAG 之间的依赖关系。依赖关系由调度器在 DAG 序列化期间计算,Web 服务器使用它们来构建依赖关系图。
依赖项检测器是可配置的,因此你可以实现自己的逻辑,而不是 DependencyDetector
中的默认逻辑
DAG 暂停、停用和删除¶
当 DAG 处于“未运行”状态时,它们有几种状态。DAG 可以暂停、停用,最后可以删除 DAG 的所有元数据。
当 DAG 存在于 DAGS_FOLDER
中,并且调度程序将其存储在数据库中,但用户选择通过 UI 禁用它时,可以通过 UI 暂停 DAG。可以通过 UI 和 API 执行“暂停”和“取消暂停”操作。暂停的 DAG 不会由调度程序调度,但是你可以通过 UI 触发它们进行手动运行。在 UI 中,你可以看到暂停的 DAG(在 暂停
选项卡中)。取消暂停的 DAG 可以在 活动
选项卡中找到。当 DAG 被暂停时,允许任何正在运行的任务完成,并且所有下游任务都将置于“已调度”状态。当 DAG 取消暂停时,任何“已调度”的任务都将根据 DAG 逻辑开始运行。没有“已调度”任务的 DAG 将根据其计划开始运行。
可以通过将 DAG 从 DAGS_FOLDER
中移除来停用 DAG(不要与 UI 中的 Active
标签混淆)。当调度器解析 DAGS_FOLDER
并发现之前见过并存储在数据库中的 DAG 不存在时,它会将其设置为停用状态。已停用 DAG 的元数据和历史记录会保留,当 DAG 重新添加到 DAGS_FOLDER
时,它会再次被激活,并且历史记录也会可见。您不能通过 UI 或 API 激活/停用 DAG,这只能通过从 DAGS_FOLDER
中删除文件来完成。再次强调,当 DAG 被调度器停用时,它的历史运行数据不会丢失。请注意,Airflow UI 中的 Active
选项卡指的是那些既不是 Activated
又不是 paused
的 DAG,这最初可能会有些令人困惑。
您在 UI 中看不到已停用的 DAG - 您有时可以看到历史运行记录,但当您尝试查看这些信息时,会看到 DAG 丢失的错误。
您还可以使用 UI 或 API 从元数据数据库中删除 DAG 元数据,但这并不总是会导致 DAG 从 UI 中消失 - 这最初也可能有点令人困惑。如果当您删除元数据时,DAG 仍然存在于 DAGS_FOLDER
中,则 DAG 将重新出现,因为调度器会解析该文件夹,只会删除 DAG 的历史运行信息。
这意味着如果您想真正删除一个 DAG 及其所有历史元数据,您需要执行以下三个步骤
暂停 DAG
通过 UI 或 API 从数据库中删除历史元数据
从
DAGS_FOLDER
中删除 DAG 文件,并等待其变为非活动状态
DAG 自动暂停(实验性)¶
DAG 也可以配置为自动暂停。有一个 Airflow 配置允许在 DAG 连续失败 N
次后自动禁用该 DAG。
我们还可以从 DAG 参数中提供和覆盖这些配置
max_consecutive_failed_dag_runs
:覆盖 max_consecutive_failed_dag_runs_per_dag。