最佳实践

创建新 DAG 是一个三步过程:

  • 编写 Python 代码以创建 DAG 对象,

  • 测试代码是否符合预期,

  • 配置环境依赖以运行您的 DAG。

本教程将向您介绍这三个步骤的最佳实践。

编写 DAG

在 Airflow 中创建新 DAG 非常简单。但是,您需要注意许多事项,以确保 DAG 的运行或失败不会产生意外结果。

创建自定义 Operator/Hook

请遵循我们关于 自定义 Operator 的指南。

创建任务

您应该将 Airflow 中的任务视为数据库中的事务。这意味着您绝不应该从任务中产生不完整的结果。例如,不要在任务结束时在 HDFSS3 中产生不完整的数据。

Airflow 可以在任务失败时重试。因此,任务在每次重新运行时应产生相同的结果。避免产生不同结果的一些方法包括:

  • 在任务重试期间不要使用 INSERT,INSERT 语句可能会导致数据库中出现重复行。请使用 UPSERT 替换它。

  • 在特定的分区中进行读写。永远不要在任务中读取最新的可用数据。因为有人可能会在重试之间更新输入数据,从而导致不同的输出。更好的方法是读取特定分区中的输入数据。您可以使用 data_interval_start 作为分区。在将数据写入 S3/HDFS 时,也应该遵循这种分区方法。

  • Python 的 datetime.now() 函数会给出当前的日期时间对象。该函数不应在任务内部使用,特别是用于关键计算,因为它会导致每次运行的结果不同。例如,用它来生成临时日志是可以的。

提示

您应该在 default_args 中定义重复的参数(如 connection_id 或 S3 路径),而不是为每个任务单独声明。 default_args 有助于避免拼写错误等问题。此外,大多数连接类型在任务中都有唯一的参数名称,因此您只需在 default_args 中声明一次连接(例如 gcp_conn_id),它就会被使用该连接类型的所有 Operator 自动使用。

删除任务

从 DAG 中删除任务时要小心。您将无法在 Graph 视图、Grid 视图等中看到该任务,这使得从 Web 服务器检查该任务的日志变得困难。如果不希望这样,请创建一个新的 DAG。

通信

如果您使用的是 Kubernetes 执行器Celery 执行器,Airflow 会在不同的服务器上执行 DAG 的任务。因此,您不应该在本地文件系统中存储任何文件或配置,因为下一个任务很可能会在不同的服务器上运行而无法访问它——例如,下载后续任务需要处理的数据文件的任务。在使用 Local executor 的情况下,将文件存储在磁盘上会使重试变得困难,例如,您的任务需要一个配置文件,但该文件被 DAG 中的另一个任务删除了。

如果可能,请使用 XCom 在任务之间传递小消息,而在任务之间传递较大数据的有效方式是使用远程存储(如 S3/HDFS)。例如,如果我们有一个将处理后的数据存储在 S3 中的任务,该任务可以将输出数据的 S3 路径推送到 Xcom,下游任务可以从 XCom 拉取该路径并使用它来读取数据。

任务也不应在内部存储任何身份验证参数,如密码或令牌。尽可能使用 连接 (Connections) 将数据安全地存储在 Airflow 后端,并使用唯一的连接 ID 检索它们。

顶级 Python 代码

您应该避免编写创建 Operator 和构建 DAG 关系所不需要的顶级代码。这是因为 Airflow 调度器的设计决策,以及顶级代码解析速度对 Airflow 的性能和可扩展性有直接影响。

Airflow 调度器会以至少 min_file_process_interval 秒的间隔执行 Operator 的 execute 方法之外的代码。这样做是为了允许 DAG 的动态调度——调度和依赖关系可能会随时间变化,并影响 DAG 的下一次调度。Airflow 调度器会持续尝试确保 DAG 中的内容正确反映在已调度的任务中。

特别地,您不应该运行任何数据库访问、繁重的计算和网络操作。

影响 DAG 加载时间的另一个重要因素(可能被 Python 开发人员忽视)是顶级导入可能需要出乎意料的长执行时间,并产生大量的开销。这可以通过将它们转换为 Python 可调用对象内部的局部导入来轻松避免。

考虑以下两个示例。在第一个示例中,DAG 的解析时间将比第二个功能等效的 DAG 多 1000 秒,在第二个示例中,expensive_api_call 是在其任务上下文中执行的。

不避免顶级 DAG 代码

import pendulum

from airflow.sdk import DAG
from airflow.sdk import task


def expensive_api_call():
    print("Hello from Airflow!")
    sleep(1000)


my_expensive_response = expensive_api_call()

with DAG(
    dag_id="example_python_operator",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
) as dag:

    @task()
    def print_expensive_api_call():
        print(my_expensive_response)

避免顶级 DAG 代码

import pendulum

from airflow.sdk import DAG
from airflow.sdk import task


def expensive_api_call():
    sleep(1000)
    return "Hello from Airflow!"


with DAG(
    dag_id="example_python_operator",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
) as dag:

    @task()
    def print_expensive_api_call():
        my_expensive_response = expensive_api_call()
        print(my_expensive_response)

在第一个示例中,expensive_api_call 在每次解析 DAG 文件时都会执行,这将导致 DAG 文件处理的性能不佳。在第二个示例中,expensive_api_call 仅在任务运行时被调用,因此可以解析而不受性能影响。您可以自行测试,实现第一个 DAG,然后查看调度器日志中打印的“Hello from Airflow!”!

请注意,import 语句也算作顶级代码。因此,如果您有一个耗时的 import 语句,或者导入的模块本身在顶级执行代码,这也会影响调度器的性能。以下示例说明了如何处理昂贵的导入。

# It's ok to import modules that are not expensive to load at top-level of a Dag file
import random
import pendulum

# Expensive imports should be avoided as top level imports, because Dag files are parsed frequently, resulting in top-level code being executed.
#
# import pandas
# import torch
# import tensorflow
#

...


@task()
def do_stuff_with_pandas_and_torch():
    import pandas
    import torch

    # do some operations using pandas and torch


@task()
def do_stuff_with_tensorflow():
    import tensorflow

    # do some operations using tensorflow

如何检查代码是否为“顶级”代码

为了理解您的代码是否为“顶级”,您需要了解 Python 解析工作方式的许多细节。通常,当 Python 解析 python 文件时,它会执行所见的代码,除了(通常)它不执行的方法内部代码。

它有许多不明显的特殊情况——例如,顶级代码也意味着用于确定方法默认值的任何代码。

但是,有一种简单的方法可以检查您的代码是否为“顶级”。您只需要解析代码并查看该代码片段是否被执行即可。

想象这段代码:

from airflow.sdk import DAG
from airflow.providers.standard.operators.python import PythonOperator
import pendulum


def get_task_id():
    return "print_array_task"  # <- is that code going to be executed?


def get_array():
    return [1, 2, 3]  # <- is that code going to be executed?


with DAG(
    dag_id="example_python_operator",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
) as dag:
    operator = PythonOperator(
        task_id=get_task_id(),
        python_callable=get_array,
        dag=dag,
    )

您可以做的是在要检查的代码中添加一些打印语句,然后运行 python <my_dag_file>.py

from airflow.sdk import DAG
from airflow.providers.standard.operators.python import PythonOperator
import pendulum


def get_task_id():
    print("Executing 1")
    return "print_array_task"  # <- is that code going to be executed? YES


def get_array():
    print("Executing 2")
    return [1, 2, 3]  # <- is that code going to be executed? NO


with DAG(
    dag_id="example_python_operator",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
) as dag:
    operator = PythonOperator(
        task_id=get_task_id(),
        python_callable=get_array,
        dag=dag,
    )

当您执行该代码时,您会看到:

[Breeze:3.10.19] root@cf85ab34571e:/opt/airflow# python /files/test_python.py
Executing 1

这意味着 get_array 没有作为顶级代码执行,但 get_task_id 执行了。

代码质量与 Linting

保持高质量的代码对于 Airflow 工作流的可靠性和可维护性至关重要。以下几点总结了其与 Ruff 的关系:

  1. 本页面记录了 Airflow 的最佳实践。其中一些实践也得到了 Ruff AIR 规则的支持,这些规则有助于检测和强制执行 Airflow 特定的最佳实践,包括已弃用的模式和迁移问题。完整列表可在 Airflow (AIR) 中查看。

  2. 如果您想提出新的 Airflow 最佳实践并添加匹配的 Ruff AIR 规则,请按照 Proposing Airflow Best Practices and Ruff AIR Rules 中描述的贡献流程进行操作。

安装和使用 ruff

  1. 安装:使用 pip 安装 ruff

    pip install "ruff>=0.15.8"
    
  2. 运行 ruff:执行 ruff 以检查您的 DAG 是否存在潜在问题

    ruff check dags/ --select AIR3
    

    此命令将分析您位于 dags/ 目录中的 DAG,并报告与指定规则相关的任何问题。

示例

假设有一个定义的旧版 DAG:

from airflow import dag
from airflow.datasets import Dataset
from airflow.sensors.filesystem import FileSensor


@dag()
def legacy_dag():
    FileSensor(task_id="wait_for_file", filepath="/tmp/test_file")

运行 ruff 将产生:

dags/legacy_dag.py:7:2: AIR301 Dag should have an explicit schedule argument
dags/legacy_dag.py:12:6: AIR302 schedule_interval is removed in Airflow 3.0
dags/legacy_dag.py:17:15: AIR302 airflow.datasets.Dataset is removed in Airflow 3.0
dags/legacy_dag.py:19:5: AIR303 airflow.sensors.filesystem.FileSensor is moved into ``standard`` provider in Airflow 3.0

通过将 ruff 集成到您的开发工作流中,您可以主动解决废弃问题并保持代码质量,从而促进 Airflow 版本之间的平滑过渡。

动态 DAG 生成

有时手动编写 DAG 不切实际。也许您有许多相似但仅参数不同的 DAG。或者您需要一组 DAG 来加载表,但不想在这些表发生变化时手动更新 DAG。在这些和其他情况下,动态生成 DAG 可能更有用。

避免在顶级代码中进行过度处理(上一章所述)在动态 DAG 配置的情况下尤为重要,该配置基本上可以通过以下方式之一进行配置:

  • 通过 环境变量(不要与 Airflow 变量 混淆)

  • 通过外部提供的、生成的包含 Dag 文件夹中元数据的 Python 代码

  • 通过外部提供的、生成的 Dag 文件夹中的配置元数据文件

动态 DAG 生成的一些案例在 动态 DAG 生成 部分进行了描述。

Airflow 变量

使用 Airflow 变量会产生网络调用和数据库访问,因此应尽可能避免在 DAG 的顶级 Python 代码中使用它们(如上一章 顶级 Python 代码 中所述)。如果必须在顶级 DAG 代码中使用 Airflow 变量,则可以通过 启用实验性缓存 并配置合理的 ttl 来减轻它们对 DAG 解析的影响。

您可以在 Operator 的 execute() 方法中自由使用 Airflow 变量,但您也可以通过 Jinja 模板将 Airflow 变量传递给现有的 Operator,这将延迟读取值直到任务执行。

执行此操作的模板语法为:

{{ var.value.<variable_name> }}

或者如果您需要从变量中反序列化 json 对象:

{{ var.json.<variable_name> }}

在顶级代码中,使用 jinja 模板的变量在任务运行前不会产生请求,而 Variable.get() 在未启用缓存的情况下,每次调度器解析 DAG 文件时都会产生一个请求。在未 启用缓存 的情况下使用 Variable.get() 将导致 DAG 文件处理性能不佳。在某些情况下,这可能导致 DAG 文件在完全解析之前超时。

错误示例:

from airflow.sdk import Variable

foo_var = Variable.get("foo")  # AVOID THAT
bash_use_variable_bad_1 = BashOperator(
    task_id="bash_use_variable_bad_1", bash_command="echo variable foo=${foo_env}", env={"foo_env": foo_var}
)

bash_use_variable_bad_2 = BashOperator(
    task_id="bash_use_variable_bad_2",
    bash_command=f"echo variable foo=${Variable.get('foo')}",  # AVOID THAT
)

bash_use_variable_bad_3 = BashOperator(
    task_id="bash_use_variable_bad_3",
    bash_command="echo variable foo=${foo_env}",
    env={"foo_env": Variable.get("foo")},  # AVOID THAT
)

正确示例:

bash_use_variable_good = BashOperator(
    task_id="bash_use_variable_good",
    bash_command="echo variable foo=${foo_env}",
    env={"foo_env": "{{ var.value.get('foo') }}"},
)
@task
def my_task():
    var = Variable.get("foo")  # This is ok since my_task is called only during task run, not during Dag scan.
    print(var)

出于安全目的,建议您对任何包含敏感数据的变量使用 Secrets 后端

时间表 (Timetables)

避免在时间表代码的顶级使用 Airflow 变量/连接或访问 Airflow 数据库。数据库访问应延迟到 DAG 的执行时间。这意味着您不应将变量/连接检索作为时间表类初始化的参数,也不应在自定义时间表模块的顶级放置变量/连接。

错误示例:

from airflow.sdk import Variable
from airflow.timetables.interval import CronDataIntervalTimetable


class CustomTimetable(CronDataIntervalTimetable):
    def __init__(self, *args, something=Variable.get("something"), **kwargs):
        self._something = something
        super().__init__(*args, **kwargs)

正确示例:

from airflow.sdk import Variable
from airflow.timetables.interval import CronDataIntervalTimetable


class CustomTimetable(CronDataIntervalTimetable):
    def __init__(self, *args, something="something", **kwargs):
        self._something = Variable.get(something)
        super().__init__(*args, **kwargs)

变更后触发 DAG

避免在更改 DAG 或 DAG 文件夹中任何其他关联文件后立即触发 DAG。

您应该给系统留出足够的时间来处理更改后的文件。这需要几个步骤:首先,文件必须分发到调度器——通常通过分布式文件系统或 Git-Sync;然后,调度器必须解析 Python 文件并将其存储在数据库中。根据您的配置、分布式文件系统的速度、文件数量、DAG 数量、文件更改数量、文件大小、调度器数量、CPU 速度,这可能需要几秒到几分钟不等,在极端情况下甚至需要几分钟。您应该等待 DAG 出现在 UI 中,然后再触发它。

如果您发现更新它到可以触发的时间之间存在较长的延迟,您可以查看以下配置参数并根据需要进行微调(通过点击链接查看每个参数的详细信息):

使用触发规则的监控模式示例

监控模式(Watcher pattern)是我们调用带有“监视”其他任务状态任务的 DAG 的方式。其主要目的是在任何其他任务失败时使 DAG 运行失败。这一需求来自 Airflow 系统测试,这些测试是带有不同任务的 DAG(类似于包含步骤的测试)。

通常,当任何任务失败时,所有其他任务都不会执行,整个 DAG 运行也会获得失败状态。但是,当我们使用触发规则时,我们可以破坏运行任务的正常流程,整个 DAG 可能会呈现出我们预期的不同状态。例如,我们可以有一个清理任务(trigger rule 设置为 TriggerRule.ALL_DONE),它无论其他任务的状态如何都会被执行(例如清理资源)。在这种情况下,DAG 总是会运行这个任务,并且 DAG 运行将获得这个特定任务的状态,因此我们可能会丢失有关失败任务的信息。如果我们想确保带有清理任务的 DAG 在任何任务失败时都会失败,我们需要使用监控模式。监控任务是一个如果触发总是会失败的任务,但它只需要在任何其他任务失败时才被触发。它需要将触发规则设置为 TriggerRule.ONE_FAILED,并且它还需要成为 DAG 中所有其他任务的下游任务。这样,如果其他所有任务都通过了,监控器将被跳过;但当有任务失败时,监控任务将被执行并失败,从而导致整个 DAG 运行失败。

注意

请注意,触发规则仅依赖于直接上游(父)任务,例如 TriggerRule.ONE_FAILED 将忽略非参数化任务直接父级的任何失败(或 upstream_failed)任务。

通过一个例子更容易理解这个概念。假设我们有以下 DAG:

from datetime import datetime

from airflow.sdk import DAG
from airflow.sdk import task
from airflow.exceptions import AirflowException
from airflow.providers.standard.operators.bash import BashOperator
from airflow.utils.trigger_rule import TriggerRule


@task(trigger_rule=TriggerRule.ONE_FAILED, retries=0)
def watcher():
    raise AirflowException("Failing task because one or more upstream tasks failed.")


with DAG(
    dag_id="watcher_example",
    schedule="@once",
    start_date=datetime(2021, 1, 1),
    catchup=False,
) as dag:
    failing_task = BashOperator(task_id="failing_task", bash_command="exit 1", retries=0)
    passing_task = BashOperator(task_id="passing_task", bash_command="echo passing_task")
    teardown = BashOperator(
        task_id="teardown",
        bash_command="echo teardown",
        trigger_rule=TriggerRule.ALL_DONE,
    )

    failing_task >> passing_task >> teardown
    list(dag.tasks) >> watcher()

执行后该 DAG 的可视化表示如下所示:

_images/watcher.png

我们有几个用于不同目的的任务:

  • failing_task 总是失败,

  • passing_task 总是成功(如果已执行),

  • teardown 总是被触发(无论其他任务的状态如何),并且它应该总是成功,

  • watcher 是每个其他任务的下游任务,即当任何任务失败时它会被触发,从而导致整个 DAG 运行失败,因为它是一个叶子任务。

重要的是要注意,如果没有 watcher 任务,整个 DAG 运行将获得 success 状态,因为唯一的失败任务不是叶子任务,并且 teardown 任务将以 success 结束。如果我们希望 watcher 监视所有任务的状态,我们需要分别让它依赖于所有任务。这样,如果任何任务失败,我们就可以使 DAG 运行失败。请注意,监控任务的触发规则设置为 "one_failed"。另一方面,如果没有 teardown 任务,则不需要 watcher 任务,因为 failing_task 会将其 failed 状态传播到下游任务 passed_task,整个 DAG 运行也将获得 failed 状态。

在集群策略中使用 AirflowClusterPolicySkipDag 异常跳过特定 DAG

在 2.7 版本中添加。

Airflow DAG 通常可以通过 git-sync 使用 Git 存储库的特定分支进行部署和更新。但是,当您由于某种操作原因需要运行多个 Airflow 集群时,维护多个 Git 分支非常繁琐。特别是当您需要周期性地同步两个独立分支(如 prodbeta)且需要合适的同步策略时,会遇到一些困难。

  • cherry-pick 维护 Git 存储库太繁琐。

  • hard-reset 不是 GitOps 推荐的方法。

因此,您可以考虑将多个 Airflow 集群连接到同一个 Git 分支(如 main),并使用不同的环境变量和具有相同 connection_id 的不同连接配置来维护它们。如果需要,您也可以在集群策略上抛出 AirflowClusterPolicySkipDag 异常,以仅在特定的 Airflow 部署上将特定 DAG 加载到 DagBag

def dag_policy(dag: DAG):
    """Skipping the Dag with `only_for_beta` tag."""

    if "only_for_beta" in dag.tags:
        raise AirflowClusterPolicySkipDag(
            f"Dag {dag.dag_id} is not loaded on the production cluster, due to `only_for_beta` tag."
        )

上面的示例展示了根据标签跳过 DAG 的 dag_policy 代码片段。

降低 DAG 复杂度

虽然 Airflow 在处理大量具有复杂任务和依赖关系的 DAG 方面表现良好,但当您有许多复杂的 DAG 时,它们的复杂度可能会影响调度的性能。保持 Airflow 实例高效和充分利用的方法之一是,尽可能简化和优化您的 DAG——您必须记住,DAG 的解析过程和创建只是执行 Python 代码,如何使其达到最高性能取决于您。没有让 DAG “不那么复杂”的魔法秘诀——因为这是 Python 代码,所以 DAG 编写者控制着其代码的复杂度。

没有 DAG 复杂度的“指标”,特别是没有能告诉您 DAG 是否“足够简单”的指标。但是,与任何 Python 代码一样,您绝对可以说您的 DAG 代码在优化后是“更简单”或“更快”的。如果您想优化 DAG,可以采取以下行动:

  • 让您的 DAG 加载更快。这是一项可以以各种方式实现的单一改进建议,但它对调度器性能的影响最大。如果您的目标是提高性能,每当有机会让您的 DAG 加载更快时,就去做。查看 顶级 Python 代码 以获取有关如何做到这一点的提示。另请查看 DAG 加载器测试,了解如何评估您的 DAG 加载时间。

  • 让您的 DAG 生成更简单的结构。每个任务依赖关系都会为调度和执行增加额外的处理开销。具有简单线性结构 A -> B -> C 的 DAG,比具有深度嵌套树结构且任务依赖关系呈指数级增长的 DAG 在任务调度中遇到的延迟更少。如果您能使 DAG 更加线性——即在执行的单点处只有极少数潜在的任务候选项可供运行——这很可能会提高整体调度性能。

  • 每个文件包含较少的 DAG。虽然 Airflow 2 针对在一个文件中包含多个 DAG 的情况进行了优化,但系统中的某些部分使其有时性能较差,或者与将这些 DAG 分散在多个文件中相比引入了更多延迟。仅一个文件只能由一个 FileProcessor 解析这一事实就使其可扩展性较差。如果您从一个文件中生成了许多 DAG,并且观察到在 Airflow UI 中反映更改需要很长时间,请考虑将它们拆分。

  • 编写高效的 Python 代码。必须在上述“每个文件较少的 DAG”与“总体编写较少的代码”之间取得平衡。创建描述 DAG 的 Python 文件应遵循最佳编程实践,而不应将其视为配置。如果您的 DAG 共享相似的代码,则不应将其一次又一次地复制到大量几乎相同的源文件中,因为这会导致对相同资源的不必要的重复导入。相反,您应该旨在最大限度地减少所有 DAG 中的重复代码,以便应用程序能够高效运行并且易于调试。请参阅 动态 DAG 生成,了解如何创建具有相似代码的多个 DAG。

测试 DAG

Airflow 用户应将 DAG 视为生产级代码,并且 DAG 应具有各种关联的测试,以确保它们产生预期的结果。您可以为 DAG 编写多种测试。让我们来看看其中一些。

DAG 加载器测试

此测试应确保您的 DAG 不包含在加载时引发错误的代码片段。用户无需编写额外代码即可运行此测试。

python your-dag-file.py

在不出现任何错误的情况下运行上述命令可确保您的 DAG 不包含任何未安装的依赖项、语法错误等。请确保在与您的调度器环境相对应的环境中加载 DAG——具有相同的依赖项、环境变量、DAG 引用的通用代码。

如果您想尝试优化 DAG 加载时间,这也是检查 DAG 在优化后是否加载得更快的好方法。只需运行 DAG 并测量所需时间,但同样,您必须确保您的 DAG 使用相同的依赖项、环境变量、通用代码运行。

有许多方法可以测量处理时间,Linux 环境中其中一种方法是使用内置的 time 命令。确保连续运行多次以考虑缓存效果。比较优化前后的结果(在相同条件下——使用相同的机器、环境等),以评估优化的影响。

time python airflow/example_dags/example_python_operator.py

结果:

real    0m0.699s
user    0m0.590s
sys     0m0.108s

重要的指标是“实际时间”——它告诉您处理 DAG 花费了多长时间。请注意,以这种方式加载文件时,您是在启动一个新的解释器,因此存在一个 Airflow 解析 DAG 时不存在的初始加载时间。您可以通过运行以下命令来评估初始化时间:

time python -c ''

结果:

real    0m0.073s
user    0m0.037s
sys     0m0.039s

在这种情况下,初始解释器启动时间约为 0.07 秒,约占解析上述 example_python_operator.py 所需时间的 10%,因此示例 DAG 的实际解析时间约为 0.62 秒。

您可以查看 测试 DAG,了解有关如何测试单个 Operator 的详细信息。

单元测试

单元测试确保您的 DAG 中没有不正确的代码。您可以为任务和 DAG 编写单元测试。

加载 DAG 的单元测试:

import pytest

from airflow.dag_processing.dagbag import DagBag


@pytest.fixture()
def dagbag():
    return DagBag()


def test_dag_loaded(dagbag):
    dag = dagbag.get_dag(dag_id="hello_world")
    assert dagbag.import_errors == {}
    assert dag is not None
    assert len(dag.tasks) == 1

单元测试 DAG 结构: 这是一个想要对照 dict 对象验证代码生成的 DAG 结构的示例测试:

def assert_dag_dict_equal(source, dag):
    assert dag.task_dict.keys() == source.keys()
    for task_id, downstream_list in source.items():
        assert dag.has_task(task_id)
        task = dag.get_task(task_id)
        assert task.downstream_task_ids == set(downstream_list)


def test_dag():
    assert_dag_dict_equal(
        {
            "DummyInstruction_0": ["DummyInstruction_1"],
            "DummyInstruction_1": ["DummyInstruction_2"],
            "DummyInstruction_2": ["DummyInstruction_3"],
            "DummyInstruction_3": [],
        },
        dag,
    )

自定义 Operator 的单元测试:

import pendulum

from airflow.sdk import DAG, TaskInstanceState


def test_my_custom_operator_execute_no_trigger(dag):
    TEST_TASK_ID = "my_custom_operator_task"
    with DAG(
        dag_id="my_custom_operator_dag",
        schedule="@daily",
        start_date=pendulum.datetime(2021, 9, 13, tz="UTC"),
    ) as dag:
        MyCustomOperator(
            task_id=TEST_TASK_ID,
            prefix="s3://bucket/some/prefix",
        )

    dagrun = dag.test()
    ti = dagrun.get_task_instance(task_id=TEST_TASK_ID)
    assert ti.state == TaskInstanceState.SUCCESS
    # Assert something related to tasks results: ti.xcom_pull()

自检

您还可以在 DAG 中实现检查,以确保任务按预期产生结果。例如,如果您有一个将数据推送到 S3 的任务,您可以在下一个任务中实现检查。例如,检查可以确保在 S3 中创建了分区,并执行一些简单的检查以确定数据是否正确。

同样,如果您有一个在 Kubernetes 或 Mesos 中启动微服务的任务,您应该使用 airflow.providers.http.sensors.http.HttpSensor 检查服务是否已启动。

task = PushToS3(...)
check = S3KeySensor(
    task_id="check_parquet_exists",
    bucket_key="s3://bucket/key/foo.parquet",
    poke_interval=0,
    timeout=0,
)
task >> check

预发布环境

如果可能,请保持一个预发布环境,以便在部署到生产环境之前测试完整的 DAG 运行。确保您的 DAG 已参数化以更改变量,例如 S3 操作的输出路径或用于读取配置的数据库。不要将值硬编码在 DAG 中,然后根据环境手动更改它们。

您可以使用环境变量来参数化 DAG。

import os

dest = os.environ.get("MY_DAG_DEST_PATH", "s3://default-target/path/")

Mock 变量和连接

当您为使用变量或连接的代码编写测试时,必须确保它们在运行测试时存在。显而易见的解决方案是将这些对象保存到数据库中,以便在执行代码时可以读取它们。但是,对数据库进行读写对象会带来额外的时间开销。为了加快测试执行速度,值得在不将这些对象保存到数据库的情况下模拟它们的存在。为此,您可以使用 unittest.mock.patch.dict() 创建带有模拟 os.environ 的环境变量。

对于变量,请使用 AIRFLOW_VAR_{KEY}

with mock.patch.dict("os.environ", AIRFLOW_VAR_KEY="env-value"):
    assert "env-value" == Variable.get("key")

对于连接,请使用 AIRFLOW_CONN_{CONN_ID}

from airflow.sdk import Connection

conn = Connection(
    conn_type="gcpssh",
    login="cat",
    host="conn-host",
)
conn_uri = conn.get_uri()
with mock.patch.dict("os.environ", AIRFLOW_CONN_MY_CONN=conn_uri):
    assert "cat" == Connection.get("my_conn").login

元数据库维护

随着时间的推移,元数据库将因积累更多的 DAG 和任务运行记录以及事件日志而增加存储占用。

您可以使用 Airflow CLI 通过命令 airflow db clean 清除旧数据。

有关更多详细信息,请参阅 db clean 用法

升级与降级

备份数据库

在进行任何修改数据库的操作之前,备份元数据库始终是一个明智的主意。

禁用调度器

您可能需要考虑在执行此类维护时禁用 Airflow 集群。

一种方法是将参数 [scheduler] > use_job_schedule 设置为 False,并等待任何正在运行的 DAG 完成;之后,除非外部触发,否则不会创建新的 DAG 运行。

一种*更好*的方法(尽管稍微有点手动)是使用 dags pause 命令。您需要在开始此操作之前跟踪已暂停的 DAG,以便知道维护完成后该取消哪些 DAG 的暂停。首先运行 airflow dags list 并存储未暂停的 DAG 列表。然后,使用此列表在维护前对每个 DAG 运行 dags pause,并在维护后运行 dags unpause。这样做的好处是您可以在升级后尝试取消暂停一两个 DAG(也许是专用的 测试 DAG),以确保在全部重新启用之前一切正常。

添加“集成测试” DAG

添加几个使用生态系统中所有常用服务(如 S3、Snowflake、Vault)但使用测试资源或“开发”账户的“集成测试” DAG 会有所帮助。这些测试 DAG 可以是您升级后*首先*启用的 DAG,因为如果它们失败,没关系,您可以回滚到备份,而不会产生负面后果。但是,如果它们成功,它们应该证明您的集群能够运行您需要使用的库和服务所对应的任务。

例如,如果您使用外部 Secrets 后端,请确保有一个检索连接的任务。如果您使用 KubernetesPodOperator,请添加一个运行 sleep 30; echo "hello" 的任务。如果您需要写入 S3,请在测试任务中进行。如果您需要访问数据库,请添加一个执行 select 1 的任务。

升级前清理数据

一些数据库迁移可能很耗时。如果您的元数据库非常大,请考虑在升级前使用 db clean 命令清理一些旧数据。请谨慎使用。

处理冲突/复杂的 Python 依赖

Airflow 有许多 Python 依赖项,有时 Airflow 依赖项会与您的任务代码所期望的依赖项发生冲突。由于默认情况下 Airflow 环境只是一组单一的 Python 依赖项和单一的 Python 环境,因此通常可能会出现某些任务需要与其它任务不同的依赖项,而这些依赖项在这些任务之间基本冲突的情况。

如果您正在使用预定义的 Airflow Operator 与外部服务通信,通常没什么选择,但这些 Operator 通常会有不与基本 Airflow 依赖项冲突的依赖项。Airflow 使用约束 (constraints) 机制,这意味着您拥有一组“固定”的依赖项,社区保证 Airflow 可以在此基础上安装(包括所有社区提供程序)而不会触发冲突。但是,您可以独立升级提供程序,并且它们的约束不会限制您,因此发生冲突依赖项的可能性较低(您仍然需要测试这些依赖项)。因此,当您使用预定义的操作符时,发生依赖项冲突的问题的可能性很小。

然而,当您以更“现代的方式”处理 Airflow 时(即使用 TaskFlow API,并且您的大多数 Operator 是使用自定义 python 代码编写的),或者当您想编写自己的自定义 Operator 时,您可能会遇到自定义代码所需的依赖项与 Airflow 的依赖项冲突的情况,甚至您的几个自定义 Operator 的依赖项之间也可能引入冲突。

有许多策略可以用来缓解这个问题。虽然在自定义 Operator 中处理依赖项冲突很困难,但当使用 airflow.providers.standard.operators.python.PythonVirtualenvOperatorairflow.providers.standard.operators.python.ExternalPythonOperator 时,情况会变得容易得多——无论是直接使用经典的“操作符”方法,还是如果您使用 TaskFlow,则使用装饰了 @task.virtualenv@task.external_python 装饰器的任务。

让我们从最容易实现的策略开始(尽管有一定的限制和开销),我们将逐步浏览那些需要更改 Airflow 部署的策略。

使用 PythonVirtualenvOperator

这是最易于使用但也最受限制的策略。PythonVirtualenvOperator 允许您动态创建一个您的 Python 可调用函数将在其中执行的虚拟环境。在 TaskFlow API 的 Pythonic DAGs 中描述的现代 TaskFlow 方法中,这也可以通过使用 @task.virtualenv 装饰器修饰您的可调用对象来实现(这是推荐的使用 Operator 的方式)。每个 airflow.providers.standard.operators.python.PythonVirtualenvOperator 任务都可以拥有自己独立的 Python 虚拟环境(在每次任务运行时动态创建),并可以指定任务执行所需的精细化依赖项集。

该 Operator 负责:

  • 基于您的环境创建虚拟环境

  • 序列化您的 Python 可调用对象,并将其传递给虚拟环境 Python 解释器执行

  • 执行它并检索可调用对象的结果,并在指定时通过 xcom 推送它

该 Operator 的好处是:

  • 无需预先准备虚拟环境。它将在任务运行前动态创建,并在完成后删除,因此没有什么特别的操作(除了在您的 Airflow 依赖项中拥有 virtualenv 包)即可使用多个虚拟环境

  • 您可以在同一工作节点上运行具有不同依赖集集的任务——从而重用内存资源(尽管请参阅下面关于创建虚拟环境所涉及的 CPU 开销的内容)。

  • 在更大的安装中,DAG 作者不需要要求任何人为您创建虚拟环境。作为 DAG 作者,您只需要安装了 virtualenv 依赖项,就可以根据需要指定和修改环境。

  • 部署需求没有变化——无论您使用本地虚拟环境、Docker 还是 Kubernetes,任务都将在无需向部署添加任何内容的情况下工作。

  • 作为 DAG 作者,无需学习更多关于容器或 Kubernetes 的知识。以这种方式编写 DAG 只需要了解 Python 需求。

该 Operator 存在一定的局限性和开销:

  • 您的 python 可调用对象必须是可序列化的。有许多 Python 对象不能使用标准 pickle 库序列化。您可以使用 dill 库来减轻其中的一些限制,但即使该库也无法解决所有序列化限制。

  • Airflow 环境中不可用的所有依赖项必须在您使用的可调用对象中局部导入,并且您的 DAG 的顶级 Python 代码不应导入/使用这些库。

  • 虚拟环境在相同的操作系统中运行,因此它们不能拥有冲突的系统级依赖项(aptyum 可安装软件包)。只有 Python 依赖项可以在这些环境中独立安装。

  • 该 Operator 为运行每个任务增加了 CPU、网络和经过时间的开销——Airflow 必须为每个任务从头开始重新创建虚拟环境

  • 工作节点需要有权访问 PyPI 或私有存储库来安装依赖项

  • 虚拟环境的动态创建容易出现暂时性故障(例如,当您的存储库不可用时,或者当与存储库连接时存在网络问题时)

  • 容易陷入“过于”动态的环境——因为您安装的依赖项可能会升级,它们的传递依赖项可能会独立升级,您可能会遇到这样的情况:由于有人发布了新版本的依赖项,您的任务停止工作,或者您可能成为“供应链”攻击的受害者,即新版本的依赖项可能变得恶意

  • 任务仅通过在不同的环境中运行来实现彼此隔离。这使得运行的任务仍然可能相互干扰——例如,在同一工作节点上执行的后续任务可能会受到之前任务创建/修改文件等操作的影响。

您可以在 TaskFlow API 教程的这一节 中查看使用 airflow.providers.standard.operators.python.PythonVirtualenvOperator 的详细示例。

使用 ExternalPythonOperator

Added in version 2.4.

使用 airflow.providers.standard.operators.python.ExternalPythonOperator 会更复杂一些,但带来的开销、安全性和稳定性问题会显著减少。在 TaskFlow API 的 Pythonic DAGs 中描述的现代 TaskFlow 方法中,这也可以通过使用 @task.external_python 装饰器修饰您的可调用对象来实现(这是推荐的使用 Operator 的方式)。然而,它要求您拥有一个预先准备好的、不可变的 Python 环境。与 airflow.providers.standard.operators.python.PythonVirtualenvOperator 不同,您不能向此类预先存在的环境添加新的依赖项。您需要的所有依赖项都应预先添加到您的环境中,并且如果您的 Airflow 在分布式环境中运行,则应在所有工作节点上可用。

通过这种方式,您可以避免重新创建虚拟环境的开销和问题,但它们必须与 Airflow 安装一起准备和部署。通常,管理 Airflow 安装的人员需要参与,在更大的安装中,这些人通常与 DAG 作者(DevOps/系统管理员)不同。

这些虚拟环境可以通过多种方式准备——如果您使用 LocalExecutor,它们只需要安装在运行调度器的机器上;如果您使用分布式的 Celery 虚拟环境安装,则应该有一个在多台机器上安装这些虚拟环境的流水线;最后,如果您使用 Docker 镜像(例如通过 Kubernetes),则应将虚拟环境的创建添加到您的自定义镜像构建流水线中。

该 Operator 的好处是:

  • 运行任务时没有设置开销。当您开始运行任务时,虚拟环境就已经准备好了。

  • 您可以在同一工作节点上运行具有不同依赖集的任务——从而重用所有资源。

  • 不需要工作节点访问 PyPI 或私有存储库。由于网络导致的暂时性错误的可能性减少。

  • 依赖项可以由管理员和安全团队预先审查,不会动态添加意想不到的新代码。这对于安全性和稳定性都有好处。

  • 对您的部署影响有限——您不需要切换到 Docker 容器或 Kubernetes 即可充分利用此 Operator。

  • 作为 Dag 编写者,无需深入了解容器或 Kubernetes。只需具备 Python 和需求(requirements)方面的知识,即可通过这种方式编写 Dag。

缺点

  • 你的环境需要提前准备好虚拟环境。这意味着通常无法即时更改环境,添加新的或更改依赖需求至少需要重新部署 Airflow;当你在开发新版本时,迭代时间可能会更长。

  • 您的 python 可调用对象必须是可序列化的。有许多 Python 对象不能使用标准 pickle 库序列化。您可以使用 dill 库来减轻其中的一些限制,但即使该库也无法解决所有序列化限制。

  • 所有 Airflow 环境中不可用的依赖项必须在所使用的可调用对象(callable)中进行本地导入,且 Dag 的顶层 Python 代码不应导入或使用这些库。

  • 虚拟环境运行在同一个操作系统中,因此它们不能有冲突的系统级依赖(即 aptyum 可安装的软件包)。在这些环境中,只有 Python 依赖项可以被独立安装。

  • 任务仅通过在不同的环境中运行来实现彼此隔离。这使得运行的任务仍然可能相互干扰——例如,在同一工作节点上执行的后续任务可能会受到之前任务创建/修改文件等操作的影响。

你可以将 PythonVirtualenvOperatorExternalPythonOperator 看作是对应的工具——它们使从开发阶段过渡到生产阶段的过程更加顺畅。作为 Dag 编写者,你通常会使用 PythonVirtualenvOperator 来迭代依赖并开发 Dag(即使用 @task.virtualenv 装饰器修饰任务),而在完成迭代和更改后,你通常希望在生产环境中切换到 ExternalPythonOperator(以及 @task.external_python),前提是 DevOps/系统管理团队已经在生产环境的预置虚拟环境中部署了你的新依赖。这样做的好处是,你可以随时将装饰器切换回来,并继续使用 PythonVirtualenvOperator 进行“动态”开发。

你可以在 TaskFlow 外部 Python 示例 中查看使用 airflow.providers.standard.operators.python.ExternalPythonOperator 的详细示例。

使用 DockerOperator 或 Kubernetes Pod Operator

另一种策略是使用 airflow.providers.docker.operators.docker.DockerOperatorairflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator。这些操作符要求 Airflow 能够访问 Docker 引擎或 Kubernetes 集群。

与 Python 操作符的情况类似,如果你想使用这些操作符来执行可调用的 Python 代码,TaskFlow 装饰器将会非常方便。

然而,这种方法涉及面更广——如果你想使用它,你需要了解 Docker/Kubernetes Pods 的工作原理,但任务之间是完全隔离的,你甚至不仅限于运行 Python 代码。你可以用任何你想要的编程语言编写任务。此外,你的依赖项与 Airflow 的依赖项完全独立(包括系统级依赖项),因此如果你的任务需要非常不同的环境,这是最佳选择。

在 2.2 版本中添加。

从 Airflow 2.2 版本开始,你可以使用 @task.docker 装饰器通过 DockerOperator 运行你的函数。

Added in version 2.4.

从 Airflow 2.2 版本开始,你可以使用 @task.kubernetes 装饰器通过 KubernetesPodOperator 运行你的函数。

使用这些操作符的好处是:

  • 你可以运行具有不同 Python 和系统级依赖项集合的任务,甚至可以运行用完全不同语言编写的任务,或者在不同的处理器架构(x86 与 arm)上运行任务。

  • 用于运行任务的环境享受容器的优化和不可变性,其中一组相似的依赖项可以有效地重用镜像中缓存的多个层,因此这种环境针对拥有多个相似但不同的环境的情况进行了优化。

  • 依赖项可以由管理员和安全团队预先审查,不会动态添加意想不到的新代码。这对于安全性和稳定性都有好处。

  • 任务之间具有强大的进程级隔离。任务运行在单独的容器/Pod 中,不能在进程或文件系统级别相互影响。它们仍然可以通过执行 API 使用标准 Airflow 机制(XCom、连接、变量)进行交互。有关完整的隔离模型,请参阅 Airflow 安全模型

缺点

  • 启动任务会有一定的开销。通常不如动态创建虚拟环境的开销大,但仍然很显著(特别是对于 KubernetesPodOperator)。

  • 在使用 TaskFlow 装饰器的情况下,整个要调用的方法需要被序列化并发送到 Docker 容器或 Kubernetes Pod,并且方法的大小在系统级有限制。在远程端对方法进行序列化、发送和最终的反序列化也会增加开销。

  • 存在因需要多个进程而带来的资源开销。对于这两个操作符,运行任务至少需要两个进程——一个进程(运行在 Docker 容器或 Kubernetes Pod 中)执行任务,另一个在 Airflow worker 中的监督进程负责将作业提交给 Docker/Kubernetes 并监视执行情况。

  • 你的环境需要提前准备好容器镜像。这意味着通常无法即时更改它们。添加系统依赖、修改或更改 Python 需求需要重新构建并发布镜像(通常是在你的私有注册表中)。当你更改依赖项时,迭代时间通常会更长,并且要求开发人员在迭代过程中构建并使用他们自己的镜像。为了可靠地维护部署,合适的部署流水线在此至关重要。

  • 如果你想通过装饰器运行 Python 可调用对象,则该对象必须是可序列化的;此外,在这种情况下,所有 Airflow 环境中不可用的依赖项必须在所使用的可调用对象中进行本地导入,且 Dag 的顶层 Python 代码不应导入或使用这些库。

  • 你需要更详细地了解 Docker 容器或 Kubernetes 的工作原理。这两个操作符提供的抽象是“有泄漏的”(leaky),因此你需要了解更多关于资源、网络、容器等方面的知识,才能编写出使用这些操作符的 Dag。

你可以在 TaskFlow Docker 示例 中查看使用 airflow.operators.providers.Docker 的详细示例,以及在 TaskFlow Kubernetes 示例 中查看 airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator 的详细示例。

使用多个 Docker 镜像和 Celery 队列

有一种可能性(尽管需要深入了解 Airflow 部署)是使用多个独立的 Docker 镜像来运行 Airflow 任务。这可以通过将不同的任务分配给不同的队列,并配置 Celery workers 为不同的队列使用不同的镜像来实现。然而,这(至少目前)需要大量的手动部署配置以及对 Airflow、Celery 和 Kubernetes 工作原理的深刻理解。此外,它还为运行任务引入了相当多的开销——资源重用的机会较少,且在不影响性能和稳定性的情况下,很难为这种部署微调资源成本。

使其更有用的一种可能方式是 AIP-46 Airflow 任务和 Dag 解析的运行时隔离 以及完成 AIP-43 Dag 处理器分离。在这些功能实现之前,使用这种方法的好处微乎其微,因此不建议使用。

然而,当这些 AIP 实现后,这将开启一种更具多租户特色的方法,届时多个团队将能够拥有完全隔离的依赖项集合,这些依赖项将贯穿 Dag 的整个生命周期——从解析到执行。

此条目是否有帮助?