最佳实践¶
创建新 DAG 是一个三步过程
编写 Python 代码创建 DAG 对象,
测试代码是否符合预期,
配置运行 DAG 的环境依赖项
本教程将向您介绍这三个步骤的最佳实践。
编写 DAG¶
在 Airflow 中创建新 DAG 相当简单。然而,需要注意许多事项,以确保 DAG 运行或失败不会产生意外结果。
创建自定义 Operator/Hook¶
请遵循我们的 自定义 Operator 指南。
创建任务¶
您应将 Airflow 中的任务视为数据库中的事务。这意味着您永远不应从任务中产生不完整的结果。例如,任务结束时不在 HDFS
或 S3
中产生不完整的数据。
Airflow 可以在任务失败时重试。因此,任务应在每次重运行时产生相同的结果。以下是一些避免产生不同结果的方法 -
任务重运行时不要使用 INSERT,INSERT 语句可能导致数据库中出现重复行。请将其替换为 UPSERT。
在特定分区中读写。永远不要在任务中读取最新的可用数据。有人可能在重运行之间更新输入数据,导致不同的输出。更好的方法是从特定分区读取输入数据。您可以使用
data_interval_start
作为分区。在向 S3/HDFS 写入数据时,也应遵循此分区方法。Python 的 datetime
now()
函数提供当前 datetime 对象。永远不应在任务内部使用此函数,尤其是在进行关键计算时,因为它会导致每次运行时产生不同的结果。例如,用于生成临时日志是没问题的。
提示
您应在 default_args
中定义重复参数,例如 connection_id
或 S3 路径,而不是为每个任务声明。 default_args
有助于避免诸如打字错误之类的错误。此外,大多数连接类型在任务中都有唯一的参数名称,因此您可以在 default_args
中只声明一次连接(例如 gcp_conn_id
),使用此连接类型的所有 Operator 将自动使用它。
删除任务¶
删除 DAG 中的任务时要小心。您将无法在 Graph View、Grid View 等视图中看到该任务,从而难以从 Webserver 查看该任务的日志。如果不希望这样,请创建新的 DAG。
通信¶
如果您使用 Kubernetes executor 或 Celery executor,Airflow 会在不同的服务器上执行 DAG 的任务。因此,不应在本地文件系统中存储任何文件或配置,因为下一个任务很可能在没有访问权限的不同服务器上运行 — 例如,一个下载数据文件的任务,而下一个任务将处理该文件。在使用 Local executor
的情况下,将文件存储在磁盘上会使重试变得更困难,例如,您的任务需要一个配置文件,而该文件已被 DAG 中的另一个任务删除。
如果可能,使用 XCom
在任务之间传递小消息,而传递大数据的良好方法是使用远程存储,例如 S3/HDFS。例如,如果有一个任务将处理后的数据存储在 S3 中,该任务可以将输出数据的 S3 路径推送到 XCom
中,下游任务可以从 XCom 中拉取路径并使用它读取数据。
任务也不应在其中存储任何认证参数,例如密码或令牌。尽可能使用 Connections 将数据安全地存储在 Airflow 后端,并使用唯一的连接 ID 检索它们。
顶层 Python 代码¶
您应避免编写与创建 Operator 和构建 DAG 之间的关系无关的顶层代码。这是因为 Airflow 调度器的设计决策以及顶层代码解析速度对 Airflow 性能和可伸缩性的影响。
Airflow 调度器以至少 min_file_process_interval 秒的最小间隔执行 Operator 的 execute
方法之外的代码。这样做是为了允许动态调度 DAG - 调度和依赖项可能会随时间变化并影响 DAG 的下一个调度。Airflow 调度器会持续确保您在 DAG 中定义的内容正确反映在已调度的任务中。
具体来说,不应运行任何数据库访问、重计算和网络操作。
影响 DAG 加载时间的一个重要因素(可能被 Python 开发者忽视)是顶层导入可能出奇地耗时,并且它们会产生大量开销,例如,可以通过将它们转换为 Python 可调用对象中的局部导入来轻松避免这种情况。
考虑以下两个示例。在第一个示例中,DAG 解析时间会比功能等效的第二个示例多花费 1000 秒,在第二个示例中 expensive_api_call
是在其任务的上下文中执行的。
不避免顶层 DAG 代码
import pendulum
from airflow.sdk import DAG
from airflow.sdk import task
def expensive_api_call():
print("Hello from Airflow!")
sleep(1000)
my_expensive_response = expensive_api_call()
with DAG(
dag_id="example_python_operator",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
) as dag:
@task()
def print_expensive_api_call():
print(my_expensive_response)
避免顶层 DAG 代码
import pendulum
from airflow.sdk import DAG
from airflow.sdk import task
def expensive_api_call():
sleep(1000)
return "Hello from Airflow!"
with DAG(
dag_id="example_python_operator",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
) as dag:
@task()
def print_expensive_api_call():
my_expensive_response = expensive_api_call()
print(my_expensive_response)
在第一个示例中,expensive_api_call
在每次解析 DAG 文件时都会执行,这将导致 DAG 文件处理性能低下。在第二个示例中,expensive_api_call
仅在任务运行时调用,因此可以在不影响性能的情况下进行解析。要亲自测试,实现第一个 DAG,然后在调度器日志中查看是否打印了“Hello from Airflow!”!
请注意,import 语句也算作顶层代码。因此,如果您的 import 语句花费很长时间,或者被导入的模块本身在顶层执行代码,这也会影响调度器的性能。以下示例说明了如何处理耗时的导入。
# It's ok to import modules that are not expensive to load at top-level of a DAG file
import random
import pendulum
# Expensive imports should be avoided as top level imports, because DAG files are parsed frequently, resulting in top-level code being executed.
#
# import pandas
# import torch
# import tensorflow
#
...
@task()
def do_stuff_with_pandas_and_torch():
import pandas
import torch
# do some operations using pandas and torch
@task()
def do_stuff_with_tensorflow():
import tensorflow
# do some operations using tensorflow
如何检查我的代码是否是“顶层”代码¶
为了理解您的代码是否是“顶层”代码,您需要了解 Python 解析工作原理的许多复杂细节。一般来说,当 Python 解析 Python 文件时,它会执行它看到的代码,除了(通常)它不执行的方法的内部代码。
它有一些不明显的特殊情况 - 例如,顶层代码也指用于确定方法默认值的任何代码。
然而,有一种简单的方法可以检查您的代码是否是“顶层”代码。您只需解析您的代码,看看该段代码是否被执行。
想象这段代码
from airflow.sdk import DAG
from airflow.providers.standard.operators.python import PythonOperator
import pendulum
def get_task_id():
return "print_array_task" # <- is that code going to be executed?
def get_array():
return [1, 2, 3] # <- is that code going to be executed?
with DAG(
dag_id="example_python_operator",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
) as dag:
operator = PythonOperator(
task_id=get_task_id(),
python_callable=get_array,
dag=dag,
)
您可以做的检查方法是,在您想检查的代码中添加一些 print 语句,然后运行 python <my_dag_file>.py
。
from airflow.sdk import DAG
from airflow.providers.standard.operators.python import PythonOperator
import pendulum
def get_task_id():
print("Executing 1")
return "print_array_task" # <- is that code going to be executed? YES
def get_array():
print("Executing 2")
return [1, 2, 3] # <- is that code going to be executed? NO
with DAG(
dag_id="example_python_operator",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
) as dag:
operator = PythonOperator(
task_id=get_task_id(),
python_callable=get_array,
dag=dag,
)
执行该代码时,您将看到
root@cf85ab34571e:/opt/airflow# python /files/test_python.py
Executing 1
这意味着 get_array
未作为顶层代码执行,而 get_task_id
执行了。
代码质量与 Linting¶
保持高代码质量对于 Airflow 工作流的可靠性和可维护性至关重要。利用 linting 工具可以帮助识别潜在问题并强制执行编码标准。ruff
就是这样一种工具,它是一种快速的 Python linter,现在包含了针对 Airflow 的特定规则。
ruff
有助于检测可能影响您迁移到 Airflow 3.0 的已弃用功能和模式。例如,它包含以 AIR
为前缀的规则来标记潜在问题
AIR301: 标记没有显式
schedule
参数的 DAG。AIR302: 识别已弃用参数
schedule_interval
的使用。AIR303: 检测从 Airflow 3.0 中已迁移或移除的模块进行的导入。
安装和使用 ruff¶
安装: 使用 pip 安装
ruff
pip install "ruff>=0.9.5"
运行 ruff: 执行
ruff
检查您的 dag 是否存在潜在问题ruff check dags/ --select AIR301,AIR302,AIR303
此命令将分析位于
dags/
目录中的 dag,并报告与指定规则相关的任何问题。
示例¶
给定如下定义的旧版 DAG
from airflow import dag
from airflow.datasets import Dataset
from airflow.sensors.filesystem import FileSensor
@dag()
def legacy_dag():
FileSensor(task_id="wait_for_file", filepath="/tmp/test_file")
运行 ruff
将产生
dags/legacy_dag.py:7:2: AIR301 DAG should have an explicit schedule argument
dags/legacy_dag.py:12:6: AIR302 schedule_interval is removed in Airflow 3.0
dags/legacy_dag.py:17:15: AIR302 airflow.datasets.Dataset is removed in Airflow 3.0
dags/legacy_dag.py:19:5: AIR303 airflow.sensors.filesystem.FileSensor is moved into ``standard`` provider in Airflow 3.0
通过将 ruff
集成到您的开发工作流中,您可以主动处理已弃用内容并维护代码质量,从而促进 Airflow 版本之间的平滑过渡。
有关 ruff
及其与 Airflow 集成的更多信息,请参阅 Airflow 官方文档。
动态 DAG 生成¶
有时手动编写 dag 不太实际。也许您有很多做类似事情的 dag,它们之间只是一个参数不同。或者您可能需要一组 dag 来加载表,但不希望在表更改时手动更新 dag。在这些或其他情况下,动态生成 dag 可能更有用。
避免在上一章描述的顶层代码中进行过多处理对于动态 DAG 配置尤为重要,动态 DAG 配置基本上可以通过以下方式之一进行配置
通过 环境变量(不要与 Airflow Variables 混淆)
通过外部提供的、生成的 Python 代码,其中包含 DAG 文件夹中的元数据
通过外部提供的、生成的配置文件元数据文件,位于 DAG 文件夹中
动态 DAG 生成的一些案例在动态 DAG 生成部分中描述。
Airflow 变量¶
使用 Airflow Variables 会产生网络调用和数据库访问,因此应尽可能避免在 dag 的顶层 Python 代码中使用它们,正如上一章“顶层 Python 代码”中所述。如果必须在顶层 DAG 代码中使用 Airflow Variables,可以通过启用实验性缓存来减轻它们对 DAG 解析的影响,并配置合理的 ttl。
您可以在 Operator 的 execute()
方法内部自由使用 Airflow Variables,也可以通过 Jinja 模板将 Airflow Variables 传递给现有的 Operator,这会将读取值的操作延迟到任务执行时。
这样做的模板语法是
{{ var.value.<variable_name> }}
或者如果您需要从变量中反序列化一个 json 对象
{{ var.json.<variable_name> }}
在顶层代码中,使用 jinja 模板的变量直到任务运行时才会产生请求,而如果未启用缓存,调度器每次解析 dag 文件时 Variable.get()
都会产生请求。在未启用缓存的情况下使用 Variable.get()
将导致 dag 文件处理性能低下。在某些情况下,这可能导致 dag 文件在完全解析之前超时。
糟糕的示例
from airflow.sdk import Variable
foo_var = Variable.get("foo") # AVOID THAT
bash_use_variable_bad_1 = BashOperator(
task_id="bash_use_variable_bad_1", bash_command="echo variable foo=${foo_env}", env={"foo_env": foo_var}
)
bash_use_variable_bad_2 = BashOperator(
task_id="bash_use_variable_bad_2",
bash_command=f"echo variable foo=${Variable.get('foo')}", # AVOID THAT
)
bash_use_variable_bad_3 = BashOperator(
task_id="bash_use_variable_bad_3",
bash_command="echo variable foo=${foo_env}",
env={"foo_env": Variable.get("foo")}, # AVOID THAT
)
好的示例
bash_use_variable_good = BashOperator(
task_id="bash_use_variable_good",
bash_command="echo variable foo=${foo_env}",
env={"foo_env": "{{ var.value.get('foo') }}"},
)
@task
def my_task():
var = Variable.get("foo") # This is ok since my_task is called only during task run, not during DAG scan.
print(var)
出于安全目的,建议对任何包含敏感数据的变量使用 Secrets Backend。
时间表¶
避免在时间表代码的顶层使用 Airflow Variables/Connections 或访问 Airflow 数据库。数据库访问应延迟到 DAG 的执行时间。这意味着不应将变量/连接检索作为时间表类初始化的参数,也不应在自定义时间表模块的顶层使用 Variable/connection。
糟糕的示例
from airflow.sdk import Variable
from airflow.timetables.interval import CronDataIntervalTimetable
class CustomTimetable(CronDataIntervalTimetable):
def __init__(self, *args, something=Variable.get("something"), **kwargs):
self._something = something
super().__init__(*args, **kwargs)
好的示例
from airflow.sdk import Variable
from airflow.timetables.interval import CronDataIntervalTimetable
class CustomTimetable(CronDataIntervalTimetable):
def __init__(self, *args, something="something", **kwargs):
self._something = Variable.get(something)
super().__init__(*args, **kwargs)
更改后触发 DAG¶
避免在更改 dag 或 DAG 文件夹中的任何其他相关文件后立即触发 dag。
您应该给系统足够的时间来处理更改的文件。这需要几个步骤。首先,文件必须分发到调度器——通常通过分布式文件系统或 Git-Sync,然后调度器必须解析 Python 文件并将其存储在数据库中。根据您的配置、分布式文件系统的速度、文件数量、dag 数量、文件更改数量、文件大小、调度器数量、CPU 速度,这可能需要几秒到几分钟,在极端情况下甚至很多分钟。您应该等待您的 DAG 出现在 UI 中才能触发它。
如果您在更新它和准备触发它之间看到长时间延迟,您可以查看以下配置参数并根据您的需要进行微调(通过链接查看每个参数的详细信息)
带有触发规则的观察者模式示例¶
观察者模式是我们称呼一个 DAG 的方式,其中包含一个“观察”其他任务状态的任务。其主要目的是在任何其他任务失败时使 DAG Run 失败。这个需求来源于 Airflow 系统测试,这些测试是包含不同任务的 dag(类似于包含步骤的测试)。
通常,当任何任务失败时,所有其他任务都不会执行,并且整个 DAG Run 也会获得失败状态。但是当我们使用触发规则时,我们可以中断任务的正常执行流程,并且整个 DAG 可能呈现出与我们期望不同的状态。例如,我们可以有一个 teardown 任务(触发规则设置为 TriggerRule.ALL_DONE
),无论其他任务的状态如何,该任务都会执行(例如,用于清理资源)。在这种情况下,DAG 将始终运行此任务,并且 DAG Run 将获得此特定任务的状态,因此我们可能会丢失有关失败任务的信息。如果我们想确保带有 teardown 任务的 DAG 在任何任务失败时都会失败,我们需要使用观察者模式。观察者任务是一个任务,如果被触发,它总是会失败,但它只需要在任何其他任务失败时才会被触发。它需要将触发规则设置为 TriggerRule.ONE_FAILED
,并且还需要是 DAG 中所有其他任务的下游任务。因此,如果所有其他任务都通过,观察者将被跳过;但如果出现失败,观察者任务将执行并失败,导致整个 DAG Run 也失败。
注意
请注意,触发规则仅依赖于直接上游(父)任务,例如 TriggerRule.ONE_FAILED
将忽略不是参数化任务的直接父任务的任何失败(或 upstream_failed
)任务。
用示例更容易理解这个概念。假设我们有以下 DAG
from datetime import datetime
from airflow.sdk import DAG
from airflow.sdk import task
from airflow.exceptions import AirflowException
from airflow.providers.standard.operators.bash import BashOperator
from airflow.utils.trigger_rule import TriggerRule
@task(trigger_rule=TriggerRule.ONE_FAILED, retries=0)
def watcher():
raise AirflowException("Failing task because one or more upstream tasks failed.")
with DAG(
dag_id="watcher_example",
schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
) as dag:
failing_task = BashOperator(task_id="failing_task", bash_command="exit 1", retries=0)
passing_task = BashOperator(task_id="passing_task", bash_command="echo passing_task")
teardown = BashOperator(
task_id="teardown",
bash_command="echo teardown",
trigger_rule=TriggerRule.ALL_DONE,
)
failing_task >> passing_task >> teardown
list(dag.tasks) >> watcher()
此 DAG 执行后的可视化表示如下所示

我们有几个任务,它们服务于不同的目的
failing_task
总是失败,passing_task
总是成功(如果执行),teardown
总是被触发(无论其他任务的状态如何),并且它应该总是成功,watcher
是其他每个任务的下游任务,即在任何任务失败时它会被触发,从而导致整个 DAG Run 失败,因为它是一个叶子任务。
值得注意的是,如果没有 watcher
任务,整个 DAG Run 将获得 success
状态,因为唯一的失败任务不是叶子任务,并且 teardown
任务将以 success
完成。如果我们希望 watcher
监控所有任务的状态,我们需要使其单独依赖于所有任务。因此,如果任何任务失败,我们可以使 DAG Run 失败。请注意,watcher 任务的触发规则设置为 "one_failed"
。另一方面,如果没有 teardown
任务,就不需要 watcher
任务,因为 failing_task
会将其 failed
状态传播到下游任务 passed_task
,并且整个 DAG Run 也会获得 failed
状态。
在集群策略中使用 AirflowClusterPolicySkipDag 异常跳过特定 DAG¶
版本 2.7 中添加。
Airflow dag 通常可以通过 git-sync
使用 Git 仓库的特定分支进行部署和更新。但是,当出于某些操作原因必须运行多个 Airflow 集群时,维护多个 Git 分支非常麻烦。特别是当您需要使用适当的分支策略定期同步两个独立的分支(例如 prod
和 beta
)时,会遇到一些困难。
cherry-pick
维护 Git 仓库过于繁琐。hard-reset
不建议用于 GitOps
因此,您可以考虑将多个 Airflow 集群连接到同一个 Git 分支(例如 main
),并通过不同的环境变量和具有相同 connection_id
的不同连接配置来维护它们。如果需要,您还可以在集群策略中引发 AirflowClusterPolicySkipDag
异常,以便仅在特定的 Airflow 部署中将特定的 dag 加载到 DagBag
中。
def dag_policy(dag: DAG):
"""Skipping the DAG with `only_for_beta` tag."""
if "only_for_beta" in dag.tags:
raise AirflowClusterPolicySkipDag(
f"DAG {dag.dag_id} is not loaded on the production cluster, due to `only_for_beta` tag."
)
上面的示例显示了根据 DAG 标签跳过该 DAG 的 dag_policy
代码片段。
降低 DAG 复杂度¶
尽管 Airflow 善于处理大量具有许多任务和相互依赖关系的 dag,但当您有许多复杂的 dag 时,它们的复杂性可能会影响调度性能。保持 Airflow 实例高性能并得到充分利用的一种方法是尽可能简化和优化您的 dag - 您必须记住,DAG 解析和创建过程只是执行 Python 代码,使其尽可能高性能取决于您自己。没有“魔法秘方”能让您的 DAG “不那么复杂” - 因为这是 Python 代码,DAG 编写者控制着代码的复杂性。
没有关于 DAG 复杂度的“指标”,特别是没有指标可以告诉您您的 DAG 是否“足够简单”。然而,与任何 Python 代码一样,当您的 DAG 代码经过优化时,您可以肯定地说它“更简单”或“更快”。如果您想优化您的 dag,可以采取以下行动
让您的 DAG 加载更快。这是一个可以通过各种方式实现的改进建议,但它对调度器的性能影响最大。每当您有机会让 DAG 加载更快时,就去做,如果您的目标是提高性能。查看顶层 Python 代码以获取一些如何做到这一点的提示。还可以查看DAG 加载器测试,了解如何评估您的 DAG 加载时间。
让您的 DAG 生成更简单的结构。每个任务依赖都会为调度和执行增加额外的处理开销。具有简单线性结构
A -> B -> C
的 DAG 在任务调度中将比具有例如依赖任务数量呈指数级增长的深度嵌套树形结构的 DAG 经历更少的延迟。如果可以使您的 dag 更加线性 - 在执行的单个时间点上,任务中潜在的可运行候选者越少,这可能会提高整体调度性能。每个文件中的 dag 数量更少。尽管 Airflow 2 针对单个文件中包含多个 dag 的情况进行了优化,但系统的某些部分有时会使其性能较低,或引入比将这些 dag 分散到多个文件中更多的延迟。仅仅一个文件只能由一个 FileProcessor 解析这一事实就使其可伸缩性降低,例如。如果您从一个文件生成了许多 dag,如果您发现将更改反映到 Airflow UI 中的 DAG 文件需要很长时间,请考虑将它们拆分。
编写高效的 Python 代码。必须在每个文件中的 dag 数量较少(如上所述)和总体代码量较少之间取得平衡。创建描述 dag 的 Python 文件应遵循最佳编程实践,而不应将其视为配置。如果您的 dag 共享相似的代码,则不应将其一遍又一遍地复制到大量几乎相同的源文件中,因为这将导致不必要的重复导入相同的资源。相反,您的目标应是尽量减少所有 dag 中的重复代码,以便应用程序可以高效运行且易于调试。请参阅代码质量与 Linting,了解如何创建具有相似代码的多个 dag。
测试 DAG¶
Airflow 用户应将 dag 视为生产级代码,并且 dag 应具有各种相关的测试以确保它们产生预期结果。您可以为 DAG 编写各种各样的测试。让我们看看其中的一些。
DAG 加载器测试¶
此测试应确保您的 DAG 不包含在加载时引发错误的代码。用户无需编写额外代码即可运行此测试。
python your-dag-file.py
运行上述命令而没有错误可确保您的 DAG 不包含任何未安装的依赖项、语法错误等。请确保您在与调度器环境相对应的环境中加载您的 DAG - 具有相同的依赖项、环境变量以及从 DAG 中引用的通用代码。
如果您想尝试优化 DAG 加载时间,这也是检查您的 DAG 在优化后是否加载得更快的好方法。只需运行 DAG 并测量所需时间即可,但再次需要确保您的 DAG 在相同的依赖项、环境变量和通用代码下运行。
测量处理时间的方法有很多,其中一种在 Linux 环境中是使用内置的 time
命令。请务必连续运行几次以考虑缓存效应。比较优化之前和之后的结果(在相同条件下 - 使用相同的机器、环境等),以评估优化的影响。
time python airflow/example_dags/example_python_operator.py
结果
real 0m0.699s
user 0m0.590s
sys 0m0.108s
重要的指标是“实际时间”(real time
) - 它告诉您处理 DAG 所花费的时间。请注意,以这种方式加载文件时,您正在启动一个新的解释器,因此会有一个初始加载时间,这在 Airflow 解析 DAG 时是不存在的。您可以通过运行以下命令评估初始化时间
time python -c ''
结果
real 0m0.073s
user 0m0.037s
sys 0m0.039s
在这种情况下,初始解释器启动时间约为 0.07秒,这大约是解析上方 example_python_operator.py所需时间的 10%,因此示例 DAG 的实际解析时间约为 0.62 秒。
你可以查看 测试 DAG,了解如何测试单个 Operator 的详细信息。
单元测试¶
单元测试确保你的 DAG 中没有不正确的代码。你可以为你的任务和 DAG 编写单元测试。
加载 DAG 的单元测试
import pytest
from airflow.models import DagBag
@pytest.fixture()
def dagbag():
return DagBag()
def test_dag_loaded(dagbag):
dag = dagbag.get_dag(dag_id="hello_world")
assert dagbag.import_errors == {}
assert dag is not None
assert len(dag.tasks) == 1
单元测试 DAG 结构:这是一个示例测试,用于根据 dict 对象验证代码生成的 DAG 的结构
def assert_dag_dict_equal(source, dag):
assert dag.task_dict.keys() == source.keys()
for task_id, downstream_list in source.items():
assert dag.has_task(task_id)
task = dag.get_task(task_id)
assert task.downstream_task_ids == set(downstream_list)
def test_dag():
assert_dag_dict_equal(
{
"DummyInstruction_0": ["DummyInstruction_1"],
"DummyInstruction_1": ["DummyInstruction_2"],
"DummyInstruction_2": ["DummyInstruction_3"],
"DummyInstruction_3": [],
},
dag,
)
自定义 Operator 的单元测试
import datetime
import pendulum
import pytest
from airflow.sdk import DAG
from airflow.utils.state import DagRunState, TaskInstanceState
from airflow.utils.types import DagRunTriggeredByType, DagRunType
DATA_INTERVAL_START = pendulum.datetime(2021, 9, 13, tz="UTC")
DATA_INTERVAL_END = DATA_INTERVAL_START + datetime.timedelta(days=1)
TEST_DAG_ID = "my_custom_operator_dag"
TEST_TASK_ID = "my_custom_operator_task"
TEST_RUN_ID = "my_custom_operator_dag_run"
@pytest.fixture()
def dag():
with DAG(
dag_id=TEST_DAG_ID,
schedule="@daily",
start_date=DATA_INTERVAL_START,
) as dag:
MyCustomOperator(
task_id=TEST_TASK_ID,
prefix="s3://bucket/some/prefix",
)
return dag
def test_my_custom_operator_execute_no_trigger(dag):
dagrun = dag.create_dagrun(
run_id=TEST_RUN_ID,
logical_date=DATA_INTERVAL_START,
data_interval=(DATA_INTERVAL_START, DATA_INTERVAL_END),
run_type=DagRunType.MANUAL,
triggered_by=DagRunTriggeredByType.TIMETABLE,
state=DagRunState.RUNNING,
start_date=DATA_INTERVAL_END,
)
ti = dagrun.get_task_instance(task_id=TEST_TASK_ID)
ti.task = dag.get_task(task_id=TEST_TASK_ID)
ti.run(ignore_ti_state=True)
assert ti.state == TaskInstanceState.SUCCESS
# Assert something related to tasks results.
自检¶
你还可以在 DAG 中实现检查,以确保任务产生预期结果。例如,如果你有一个将数据推送到 S3 的任务,你可以在下一个任务中实现一个检查。例如,该检查可以确保在 S3 中创建了分区,并执行一些简单检查来确定数据是否正确。
同样,如果你有一个任务在 Kubernetes 或 Mesos 中启动微服务,你应该使用 airflow.providers.http.sensors.http.HttpSensor
检查服务是否已启动。
task = PushToS3(...)
check = S3KeySensor(
task_id="check_parquet_exists",
bucket_key="s3://bucket/key/foo.parquet",
poke_interval=0,
timeout=0,
)
task >> check
暂存环境¶
如果可能,在部署到生产环境之前保留一个暂存环境来测试完整的 DAG 运行。确保你的 DAG 已参数化以更改变量,例如 S3 操作的输出路径或用于读取配置的数据库。不要在 DAG 内部硬编码值,然后根据环境手动更改它们。
你可以使用环境变量来参数化 DAG。
import os
dest = os.environ.get("MY_DAG_DEST_PATH", "s3://default-target/path/")
模拟变量和连接¶
当你为使用变量或连接的代码编写测试时,必须确保它们在你运行测试时存在。显而易见的解决方案是将这些对象保存到数据库,以便在代码执行时可以读取它们。然而,读写数据库对象会带来额外的时间开销。为了加快测试执行速度,值得在不将这些对象保存到数据库的情况下模拟它们的存在。为此,你可以使用 unittest.mock.patch.dict()
模拟 os.environ
来创建环境变量。
对于变量,使用 AIRFLOW_VAR_{KEY}
。
with mock.patch.dict("os.environ", AIRFLOW_VAR_KEY="env-value"):
assert "env-value" == Variable.get("key")
对于连接,使用 AIRFLOW_CONN_{CONN_ID}
。
conn = Connection(
conn_type="gcpssh",
login="cat",
host="conn-host",
)
conn_uri = conn.get_uri()
with mock.patch.dict("os.environ", AIRFLOW_CONN_MY_CONN=conn_uri):
assert "cat" == Connection.get_connection_from_secrets("my_conn").login
元数据数据库维护¶
随着时间的推移,元数据数据库的存储空间会增加,因为会积累更多的 DAG 运行、任务运行和事件日志。
你可以使用 Airflow CLI 通过命令 airflow db clean
清除旧数据。
有关更多详细信息,请参阅 db clean 用法。
升级和降级¶
备份你的数据库¶
在进行任何修改数据库的操作之前,备份元数据数据库始终是明智之举。
禁用调度器¶
在执行此类维护时,你可以考虑禁用 Airflow 集群。
一种方法是将参数 [scheduler] > use_job_schedule
设置为 False
并等待任何正在运行的 DAG 完成;之后除非外部触发,否则不会创建新的 DAG 运行。
一个更好的方法(虽然更手动一些)是使用 dags pause
命令。你需要记录在开始此操作之前已暂停的 DAG 列表,以便在维护完成后知道要取消暂停哪些。首先运行 airflow dags list
并存储未暂停的 DAG 列表。然后使用同一列表在维护前对每个 DAG 运行 dags pause
,在维护后运行 dags unpause
。这样做的一个好处是,你可以在升级后尝试只取消暂停一两个 DAG(可能专门用于 测试 DAG),以确保一切正常工作,然后再全部开启。
添加“集成测试”DAG¶
添加几个使用生态系统中所有常见服务(例如 S3、Snowflake、Vault)但使用测试资源或“dev”帐户的“集成测试”DAG 会很有帮助。这些测试 DAG 可以在升级后首先启用,因为如果它们失败,也没关系,并且你可以回滚到备份而没有负面影响。但是,如果它们成功,它们应该证明你的集群能够运行使用你所需库和服务所需的任务。
例如,如果你使用外部 secrets 后端,请确保有一个任务可以检索连接。如果你使用 KubernetesPodOperator,添加一个运行 sleep 30; echo "hello"
的任务。如果你需要写入 S3,请在测试任务中执行。如果你需要访问数据库,添加一个从服务器执行 select 1
的任务。
升级前清理数据¶
有些数据库迁移可能很耗时。如果你的元数据数据库非常大,请考虑在执行升级之前使用 db clean 命令清理一些旧数据。请谨慎使用。
处理冲突/复杂的 Python 依赖¶
Airflow 有许多 Python 依赖项,有时 Airflow 依赖项会与你的任务代码期望的依赖项冲突。由于 Airflow 环境默认只有一组 Python 依赖项和一个 Python 环境,因此通常也可能出现某些任务需要与其他任务不同的依赖项,并且这些任务之间的依赖项基本上是冲突的。
如果你使用预定义的 Airflow Operator 与外部服务进行通信,选择不多,但通常这些 Operator 的依赖项不会与基本的 Airflow 依赖项冲突。Airflow 使用约束机制,这意味着你有一组“固定”的依赖项,社区保证 Airflow(包括所有社区 provider)安装时不会触发冲突。然而,你可以独立升级 provider,它们的约束不会限制你,因此发生依赖项冲突的可能性较低(你仍然需要测试这些依赖项)。因此,当你使用预定义的 Operator 时,很可能很少或根本不会遇到依赖项冲突问题。
然而,当你以更“现代的方式”使用 Airflow,即使用 TaskFlow Api 并且你的大多数 Operator 是使用自定义 Python 代码编写的,或者当你想要编写自己的自定义 Operator 时,你可能会遇到自定义代码所需的依赖项与 Airflow 的依赖项冲突的情况,甚至是你的一些自定义 Operator 的依赖项之间也会产生冲突。
有许多策略可以用来缓解这个问题。虽然处理自定义 Operator 中的依赖冲突很困难,但在使用 airflow.providers.standard.operators.python.PythonVirtualenvOperator
或 airflow.providers.standard.operators.python.ExternalPythonOperator
时就容易得多了——无论是直接使用经典的“operator”方法,还是如果你使用 TaskFlow,通过使用带有 @task.virtualenv
或 @task.external_python
装饰器的任务。
让我们从最容易实现的策略(有一些限制和开销)开始,然后逐步介绍那些需要对你的 Airflow 部署进行一些更改的策略。
使用 PythonVirtualenvOperator¶
这是最简单易用但限制最多的策略。PythonVirtualenvOperator 允许你动态创建一个虚拟环境来执行你的 Python 可调用函数。在 使用 TaskFlow API 构建 Python 式 DAG 中描述的现代 TaskFlow 方法中,这也可以通过使用 @task.virtualenv
装饰器来装饰你的可调用对象(推荐使用该 Operator 的方式)来实现。每个 airflow.providers.standard.operators.python.PythonVirtualenvOperator
任务都可以拥有自己独立的 Python 虚拟环境(每次任务运行时动态创建),并且可以指定执行该任务所需安装的细粒度要求集。
该 Operator 负责处理
根据你的环境创建虚拟环境
序列化你的 Python 可调用对象并将其传递给虚拟环境 Python 解释器执行
执行它并检索可调用对象的结果,如果指定了则通过 xcom 推送结果
该 Operator 的优点是
无需提前准备虚拟环境。它将在任务运行前动态创建,并在完成后移除,因此利用多个虚拟环境无需做任何特别的事情(除了在你的 Airflow 依赖项中包含 virtualenv 包)
你可以在同一个 Worker 上运行具有不同依赖项集的任务——因此内存资源得到重用(尽管请参阅下文关于创建虚拟环境涉及的 CPU 开销)。
在较大的安装中,DAG 作者无需请任何人为你创建虚拟环境。作为 DAG 作者,你只需安装 virtualenv 依赖项,即可根据需要指定和修改环境。
部署要求不变——无论你使用本地虚拟环境、Docker 还是 Kubernetes,任务都可以工作,无需在部署中添加任何内容。
作为 DAG 作者,无需学习更多关于容器、Kubernetes 的知识。只需了解 Python 要求即可用这种方式编写 DAG。
该 Operator 引入了一些限制和开销
你的 Python 可调用对象必须可序列化。许多 Python 对象无法使用标准的
pickle
库序列化。你可以通过使用dill
库来缓解其中一些限制,但即使是该库也无法解决所有序列化限制。Airflow 环境中不可用的所有依赖项必须在你使用的可调用对象中本地导入,并且你的 DAG 的顶级 Python 代码不应导入/使用这些库。
虚拟环境在同一个操作系统中运行,因此它们不能有冲突的系统级依赖项(可使用
apt
或yum
安装的包)。只有 Python 依赖项可以在这些环境中独立安装。该 Operator 为运行每个任务增加了 CPU、网络和耗时开销——Airflow 必须为每个任务从头重新创建虚拟环境
Worker 需要访问 PyPI 或私有仓库来安装依赖项
虚拟环境的动态创建容易出现瞬时故障(例如当你的仓库不可用或访问仓库时出现网络问题)
很容易陷入“过于”动态的环境——因为你安装的依赖项可能会升级,并且它们的传递依赖项可能会独立升级,你最终可能会遇到这样的情况:你的任务停止工作,因为有人发布了新版本的依赖项,或者你可能会成为“供应链”攻击的受害者,新版本的依赖项可能会变得恶意。
任务仅通过在不同环境中运行而相互隔离。这使得运行中的任务仍然可能相互干扰——例如,在同一 Worker 上执行的后续任务可能会受到先前创建/修改文件等任务的影响。
你可以在 Taskflow API 教程中的这一部分 查看使用 airflow.providers.standard.operators.python.PythonVirtualenvOperator
的详细示例。
使用 ExternalPythonOperator¶
自 2.4 版本添加。
使用 airflow.providers.standard.operators.python.ExternalPythonOperator`
会稍微复杂一些,但能显著减少开销、安全和稳定性问题。在 使用 TaskFlow API 构建 Python 式 DAG 中描述的现代 TaskFlow 方法中,这也可以通过使用 @task.external_python
装饰器装饰你的可调用对象(推荐使用该 Operator 的方式)来实现。然而,它要求你有一个预先存在、不可变的 Python 环境,该环境需要提前准备好。与 airflow.providers.standard.operators.python.PythonVirtualenvOperator
不同的是,你无法向此类预先存在的环境添加新的依赖项。你所需的所有依赖项都应预先添加到你的环境中,并且在你的 Airflow 在分布式环境中运行时,所有 Worker 都应可用。
通过这种方式,你可以避免重新创建虚拟环境带来的开销和问题,但它们必须与 Airflow 安装一起准备和部署。通常需要管理 Airflow 安装的人员参与,在大型安装中,这些人通常与 DAG 作者不同(运维/系统管理员)。
这些虚拟环境可以用各种方式准备——如果你使用 LocalExecutor,只需将它们安装在运行调度器的机器上;如果你使用分布式 Celery 虚拟环境安装,应该有一个管道将这些虚拟环境安装到多台机器上;最后,如果你使用 Docker 镜像(例如通过 Kubernetes),则应将虚拟环境创建添加到你的自定义镜像构建管道中。
该 Operator 的优点是
运行任务时没有设置开销。开始运行任务时,虚拟环境已准备就绪。
你可以在同一 Worker 上运行具有不同依赖项集的任务——因此所有资源都得到重用。
Worker 无需访问 PyPI 或私有仓库。由网络引起的瞬时错误的可能性较小。
依赖项可以由管理员和你的安全团队预先审查,不会动态添加意料之外的新代码。这对于安全性和稳定性都有益处。
对你的部署影响有限——你无需切换到 Docker 容器或 Kubernetes 即可很好地使用该 Operator。
作为 DAG 作者,无需学习更多关于容器、Kubernetes 的知识。只需了解 Python、requirements 即可用这种方式编写 DAG。
缺点
你的环境需要提前准备好虚拟环境。这通常意味着你无法即时更改它,添加新的或更改 requirement 至少需要重新部署 Airflow,并且开发新版本时的迭代时间可能会更长。
你的 Python 可调用对象必须可序列化。许多 Python 对象无法使用标准的
pickle
库序列化。你可以通过使用dill
库来缓解其中一些限制,但即使是该库也无法解决所有序列化限制。Airflow 环境中不可用的所有依赖项必须在你使用的可调用对象中本地导入,并且你的 DAG 的顶级 Python 代码不应导入/使用这些库。
虚拟环境在同一个操作系统中运行,因此它们不能有冲突的系统级依赖项(可使用
apt
或yum
安装的包)。只有 Python 依赖项可以在这些环境中独立安装任务仅通过在不同环境中运行而相互隔离。这使得运行中的任务仍然可能相互干扰——例如,在同一 Worker 上执行的后续任务可能会受到先前创建/修改文件等任务的影响。
你可以将 PythonVirtualenvOperator
和 ExternalPythonOperator
视为对应物,它们使从开发阶段到生产阶段的过渡更加顺畅。作为 DAG 作者,你通常会使用 PythonVirtualenvOperator
(因此使用 @task.virtualenv
装饰器装饰你的任务)进行依赖项迭代和 DAG 开发,而在迭代和更改之后,你很可能希望为了生产环境将其切换到 ExternalPythonOperator
(以及 @task.external_python
),在你公司的 DevOps/系统管理员团队在生产环境的预先存在的虚拟环境中部署你的新依赖项之后。这样做的好处是,你可以在任何时候切换回装饰器,并继续使用 PythonVirtualenvOperator
“动态地”开发它。
你可以在 Taskflow 外部 Python 示例 查看使用 airflow.providers.standard.operators.python.ExternalPythonOperator
的详细示例
使用 DockerOperator 或 Kubernetes Pod Operator¶
这些要求 Airflow 能够访问 Docker 引擎或 Kubernetes 集群。
与 Python Operator 类似,如果你想使用这些 Operator 来执行你的可调用 Python 代码,TaskFlow 装饰器也会非常方便。
然而,它要复杂得多——如果你想使用这种方法,你需要理解 Docker/Kubernetes Pod 的工作原理,但任务彼此完全隔离,而且你甚至不限于运行 Python 代码。你可以用任何你想用的编程语言编写任务。此外,你的依赖项完全独立于 Airflow 的依赖项(包括系统级依赖项),因此如果你的任务需要一个非常不同的环境,这是最佳选择。
自 2.2 版本添加。
从 Airflow 2.2 版本开始,你可以使用 @task.docker
装饰器来使用 DockerOperator
运行你的函数。
自 2.4 版本添加。
从 Airflow 2.2 版本开始,你可以使用 @task.kubernetes
装饰器来使用 KubernetesPodOperator
运行你的函数。
使用这些 Operator 的优点是
你可以运行具有不同 Python 和系统级依赖项的任务集,甚至可以使用完全不同的语言或不同的处理器架构(x86 vs. arm)编写的任务。
用于运行任务的环境受益于容器的优化和不可变性,其中相似的依赖项集可以有效地重用镜像的许多缓存层,因此该环境针对具有多个相似但不同的环境的情况进行了优化。
依赖项可以由管理员和你的安全团队预先审查,不会动态添加意料之外的新代码。这对于安全性和稳定性都有益处。
任务之间完全隔离。除了使用标准的 Airflow XCom 机制外,它们不能以其他方式相互影响。
缺点
启动任务存在开销。通常不像动态创建虚拟环境时那么大,但仍然显著(特别是对于
KubernetesPodOperator
)。在使用 TaskFlow 装饰器的情况下,需要将整个要调用的方法序列化并发送到 Docker 容器或 Kubernetes Pod,并且对方法的大小存在系统级限制。在远程端序列化、发送以及最后反序列化方法也会增加开销。
存在多个进程所需的资源开销。使用这两个 Operator 运行任务至少需要两个进程——一个执行任务的进程(在 Docker 容器或 Kubernetes Pod 中运行),以及一个在 Airflow Worker 中负责向 Docker/Kubernetes 提交作业并监控执行的监督进程。
你的环境需要提前准备好容器镜像。这通常意味着你无法即时更改它们。添加系统依赖项、修改或更改 Python requirements 需要重新构建和发布镜像(通常在你的私有 registry 中)。开发新依赖项时的迭代时间通常更长,如果开发人员在迭代过程中更改依赖项,则需要构建和使用自己的镜像。在这种情况下,一个合适的部署管道对于可靠地维护你的部署至关重要。
如果你想通过装饰器运行你的 Python 可调用对象,它必须可序列化,在这种情况下,Airflow 环境中不可用的所有依赖项也必须在你使用的可调用对象中本地导入,并且你的 DAG 的顶级 Python 代码不应导入/使用那些库。
你需要更深入地了解 Docker 容器或 Kubernetes 的工作原理。它们提供的抽象是“有漏洞的”(leaky),因此你需要了解更多关于资源、网络、容器等方面的信息,才能编写使用这些 Operator 的 DAG。
你可以在 Taskflow Docker 示例 和 Taskflow Kubernetes 示例 查看使用 airflow.operators.providers.Docker
和 airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator
的详细示例
使用多个 Docker 镜像和 Celery 队列¶
虽然需要深入了解 Airflow 部署,但有可能使用多个独立的 Docker 镜像运行 Airflow 任务。这可以通过将不同的任务分配到不同的队列,并配置你的 Celery Worker 为不同的队列使用不同的镜像来实现。然而,这(至少目前)需要大量的手动部署配置以及对 Airflow、Celery 和 Kubernetes 工作原理的内在了解。此外,它为运行任务带来了相当大的开销——资源重用的机会较少,并且很难在不影响性能和稳定性的情况下精细调整此类部署以控制资源成本。
使其更有用的一种可能方式是 AIP-46 Airflow 任务和 DAG 解析的运行时隔离,以及完成 AIP-43 DAG Processor 分离。在这些实现之前,使用这种方法的益处非常少,并且不推荐。
然而,当这些 AIP 实现后,这将开启一种更具多租户可行性的方法,多个团队将能够拥有完全隔离的依赖项集,这些依赖项将在 DAG 的整个生命周期中使用——从解析到执行。