最佳实践

创建一个新的 DAG 分为三个步骤

  • 编写 Python 代码来创建一个 DAG 对象,

  • 测试代码是否符合您的期望,

  • 配置环境依赖项以运行您的 DAG

本教程将向您介绍这三个步骤的最佳实践。

编写 DAG

在 Airflow 中创建一个新的 DAG 非常简单。但是,您需要注意很多事项,以确保 DAG 运行或失败不会产生意外结果。

创建自定义 Operator/Hook

请遵循我们的自定义运算符指南。

创建任务

您应该将 Airflow 中的任务视为数据库中的事务。这意味着您永远不应该从您的任务中产生不完整的结果。一个例子是在任务结束时不要在 HDFSS3 中产生不完整的数据。

如果任务失败,Airflow 可以重试任务。因此,任务应该在每次重新运行时产生相同的结果。以下是一些您可以避免产生不同结果的方法 -

  • 不要在任务重新运行时使用 INSERT,INSERT 语句可能会导致数据库中出现重复的行。请将其替换为 UPSERT。

  • 在特定分区中读取和写入。永远不要在任务中读取最新的可用数据。在重新运行之间,可能会有人更新输入数据,从而导致不同的输出。更好的方法是从特定分区读取输入数据。您可以使用 data_interval_start 作为分区。在 S3/HDFS 中写入数据时,您也应该遵循这种分区方法。

  • Python 日期时间 now() 函数返回当前的日期时间对象。此函数绝不应在任务内部使用,尤其是在执行关键计算时,因为它会导致每次运行的结果不同。例如,使用它来生成临时日志是可以的。

提示

您应该在 default_args 中定义重复的参数,例如 connection_id 或 S3 路径,而不是为每个任务声明它们。default_args 有助于避免诸如错别字之类的错误。此外,大多数连接类型在任务中都有唯一的参数名称,因此您可以在 default_args 中仅声明一次连接(例如 gcp_conn_id),并且所有使用此连接类型的运算符都会自动使用它。

删除任务

从 DAG 中删除任务时请小心。您将无法在图形视图、网格视图等中看到该任务,从而难以从 Web 服务器检查该任务的日志。如果不需要这样做,请创建一个新的 DAG。

通信

如果您使用 Kubernetes 执行器Celery 执行器,Airflow 会在不同的服务器上执行 DAG 的任务。因此,您不应在本地文件系统中存储任何文件或配置,因为下一个任务很可能会在另一台无法访问它的服务器上运行 — 例如,一个任务下载数据文件,而下一个任务会处理该文件。在 Local executor 的情况下,在磁盘上存储文件可能会使重试更加困难,例如,您的任务需要一个配置,该配置被 DAG 中的另一个任务删除。

如果可能,请使用 XCom 在任务之间传递小消息,在任务之间传递较大数据的最佳方式是使用远程存储,例如 S3/HDFS。例如,如果我们有一个任务将处理后的数据存储在 S3 中,那么该任务可以将输出数据的 S3 路径推送到 Xcom 中,下游任务可以从 XCom 中拉取该路径并使用它来读取数据。

任务也不应在其中存储任何身份验证参数,例如密码或令牌。在任何可能的情况下,请使用连接将数据安全地存储在 Airflow 后端,并使用唯一的连接 ID 检索它们。

顶层 Python 代码

您应该避免编写对于创建运算符和构建它们之间的 DAG 关系而言不必要的顶层代码。这是因为 Airflow 调度器的设计决策以及顶层代码解析速度对 Airflow 性能和可伸缩性的影响。

Airflow 调度器以最小的 min_file_process_interval 秒的间隔执行运算符的 execute 方法之外的代码。这样做是为了允许 DAG 的动态调度 - 其中调度和依赖关系可能会随着时间的推移而改变,并影响 DAG 的下一个计划。Airflow 调度器尝试持续确保您在 DAG 中所拥有的内容能够正确反映在计划的任务中。

特别是,您不应运行任何数据库访问、繁重的计算和网络操作。

影响 DAG 加载时间的重要因素之一(可能会被 Python 开发人员忽略)是,顶层导入可能会花费大量时间,并且它们可能会产生大量开销,可以通过将它们转换为 Python 可调用对象中的本地导入来轻松避免这种情况,例如。

考虑下面的两个示例。在第一个示例中,DAG 的解析时间将比第二个示例中功能相同的 DAG 多 1000 秒,在第二个示例中,expensive_api_call 是从其任务的上下文中执行的。

不避免顶层 DAG 代码

import pendulum

from airflow import DAG
from airflow.decorators import task


def expensive_api_call():
    print("Hello from Airflow!")
    sleep(1000)


my_expensive_response = expensive_api_call()

with DAG(
    dag_id="example_python_operator",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
) as dag:

    @task()
    def print_expensive_api_call():
        print(my_expensive_response)

避免顶层 DAG 代码

import pendulum

from airflow import DAG
from airflow.decorators import task


def expensive_api_call():
    sleep(1000)
    return "Hello from Airflow!"


with DAG(
    dag_id="example_python_operator",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
) as dag:

    @task()
    def print_expensive_api_call():
        my_expensive_response = expensive_api_call()
        print(my_expensive_response)

在第一个示例中,每次解析 DAG 文件时都会执行 expensive_api_call,这将导致 DAG 文件处理的性能不佳。在第二个示例中,expensive_api_call 仅在任务运行时调用,因此能够在不遭受任何性能影响的情况下进行解析。要自己测试一下,请实现第一个 DAG 并查看调度器日志中打印的“Hello from Airflow!”!

请注意,导入语句也算作顶层代码。因此,如果您有一个耗时的导入语句或导入的模块本身在顶层执行代码,这也可能会影响调度器的性能。以下示例说明了如何处理昂贵的导入。

# It's ok to import modules that are not expensive to load at top-level of a DAG file
import random
import pendulum

# Expensive imports should be avoided as top level imports, because DAG files are parsed frequently, resulting in top-level code being executed.
#
# import pandas
# import torch
# import tensorflow
#

...


@task()
def do_stuff_with_pandas_and_torch():
    import pandas
    import torch

    # do some operations using pandas and torch


@task()
def do_stuff_with_tensorflow():
    import tensorflow

    # do some operations using tensorflow

如何检查我的代码是否是“顶层”代码

为了了解您的代码是否是“顶层”,您需要了解 Python 解析工作原理的许多复杂性。一般来说,当 Python 解析 Python 文件时,它会执行它看到的代码,除了(通常)它不执行的方法的内部代码。

它有许多不明显的特殊情况 - 例如,顶层代码也意味着用于确定方法的默认值的任何代码。

但是,有一种简单的方法可以检查您的代码是否是“顶层”。您只需解析您的代码并查看该段代码是否被执行。

想象一下这段代码

from airflow import DAG
from airflow.operators.python import PythonOperator
import pendulum


def get_task_id():
    return "print_array_task"  # <- is that code going to be executed?


def get_array():
    return [1, 2, 3]  # <- is that code going to be executed?


with DAG(
    dag_id="example_python_operator",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
) as dag:
    operator = PythonOperator(
        task_id=get_task_id(),
        python_callable=get_array,
        dag=dag,
    )

为了检查代码,你可以添加一些打印语句到你要检查的代码中,然后运行 python <my_dag_file>.py

from airflow import DAG
from airflow.operators.python import PythonOperator
import pendulum


def get_task_id():
    print("Executing 1")
    return "print_array_task"  # <- is that code going to be executed? YES


def get_array():
    print("Executing 2")
    return [1, 2, 3]  # <- is that code going to be executed? NO


with DAG(
    dag_id="example_python_operator",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
) as dag:
    operator = PythonOperator(
        task_id=get_task_id(),
        python_callable=get_array,
        dag=dag,
    )

当你执行这段代码时,你会看到

root@cf85ab34571e:/opt/airflow# python /files/test_python.py
Executing 1

这意味着 get_array 没有作为顶层代码执行,而 get_task_id 则是在顶层执行的。

动态 DAG 生成

有时手动编写 DAG 并不实用。也许你有很多 DAG 做着类似的事情,只是参数在它们之间变化。或者你可能需要一组 DAG 来加载表,但不想每次这些表更改时都手动更新 DAG。在这些和其他情况下,动态生成 DAG 可能更有用。

避免在顶层代码中进行过多的处理,这在动态 DAG 配置的情况下尤其重要,动态 DAG 配置基本上可以通过以下方式之一进行配置:

  • 通过 环境变量(不要与 Airflow 变量混淆)

  • 通过外部提供的、生成的 Python 代码,包含 DAG 文件夹中的元数据

  • 通过外部提供的、生成的配置文件元数据,位于 DAG 文件夹中

动态 DAG 生成的一些案例在 动态 DAG 生成 部分中描述。

Airflow 变量

使用 Airflow 变量会产生网络调用和数据库访问,因此应尽可能避免在 DAG 的顶层 Python 代码中使用它们,如上一章 顶层 Python 代码 中所述。如果必须在顶层 DAG 代码中使用 Airflow 变量,则可以通过启用实验性缓存来减轻它们对 DAG 解析的影响,并配置合理的 ttl

你可以在操作符的 execute() 方法中自由使用 Airflow 变量,但你也可以通过 Jinja 模板将 Airflow 变量传递给现有的操作符,这将延迟读取值直到任务执行。

执行此操作的模板语法是

{{ var.value.<variable_name> }}

或者,如果你需要从变量中反序列化一个 json 对象

{{ var.json.<variable_name> }}

在顶层代码中,使用 Jinja 模板的变量在任务运行之前不会产生请求,而 Variable.get() 如果未启用缓存,则每次调度程序解析 dag 文件时都会产生请求。在没有启用缓存的情况下使用 Variable.get() 会导致 dag 文件处理性能不佳。在某些情况下,这可能会导致 dag 文件在完全解析之前超时。

反例

from airflow.models import Variable

foo_var = Variable.get("foo")  # AVOID THAT
bash_use_variable_bad_1 = BashOperator(
    task_id="bash_use_variable_bad_1", bash_command="echo variable foo=${foo_env}", env={"foo_env": foo_var}
)

bash_use_variable_bad_2 = BashOperator(
    task_id="bash_use_variable_bad_2",
    bash_command=f"echo variable foo=${Variable.get('foo')}",  # AVOID THAT
)

bash_use_variable_bad_3 = BashOperator(
    task_id="bash_use_variable_bad_3",
    bash_command="echo variable foo=${foo_env}",
    env={"foo_env": Variable.get("foo")},  # AVOID THAT
)

正例

bash_use_variable_good = BashOperator(
    task_id="bash_use_variable_good",
    bash_command="echo variable foo=${foo_env}",
    env={"foo_env": "{{ var.value.get('foo') }}"},
)
@task
def my_task():
    var = Variable.get("foo")  # This is ok since my_task is called only during task run, not during DAG scan.
    print(var)

出于安全目的,建议对任何包含敏感数据的变量使用 Secrets Backend

时间表

避免在时间表代码的顶层使用 Airflow 变量/连接或访问 airflow 数据库。数据库访问应延迟到 DAG 的执行时间。这意味着你不应该将变量/连接检索作为时间表类初始化的参数,或者在自定义时间表模块的顶层拥有变量/连接。

反例

from airflow.models.variable import Variable
from airflow.timetables.interval import CronDataIntervalTimetable


class CustomTimetable(CronDataIntervalTimetable):
    def __init__(self, *args, something=Variable.get("something"), **kwargs):
        self._something = something
        super().__init__(*args, **kwargs)

正例

from airflow.models.variable import Variable
from airflow.timetables.interval import CronDataIntervalTimetable


class CustomTimetable(CronDataIntervalTimetable):
    def __init__(self, *args, something="something", **kwargs):
        self._something = Variable.get(something)
        super().__init__(*args, **kwargs)

更改后触发 DAG

避免在更改 DAG 或 DAG 文件夹中更改的任何其他附带文件后立即触发 DAG。

你应该给系统足够的时间来处理更改后的文件。这需要几个步骤。首先,文件必须分发到调度程序 - 通常通过分布式文件系统或 Git-Sync,然后调度程序必须解析 Python 文件并将其存储在数据库中。根据你的配置、分布式文件系统的速度、文件数量、DAG 数量、文件中的更改数量、文件大小、调度程序数量、CPU 速度,这可能需要几秒到几分钟,在极端情况下可能需要几分钟。你应该等待你的 DAG 出现在 UI 中才能触发它。

如果你发现更新和准备好触发之间有很长的延迟,你可以查看以下配置参数并根据你的需要进行微调(通过链接查看每个参数的详细信息):

带有触发规则的观察者模式示例

观察者模式是我们调用具有“监视”其他任务状态的任务的 DAG 的方式。其主要目的是在任何其他任务失败时使 DAG 运行失败。这个需求来自 Airflow 系统测试,它们是具有不同任务的 DAG(类似于包含步骤的测试)。

通常,当任何任务失败时,所有其他任务都不会执行,并且整个 DAG 运行也会失败。但是,当我们使用触发规则时,我们可以中断正在运行的任务的正常流程,并且整个 DAG 可能表示我们期望的不同状态。例如,我们可以有一个清理任务(触发规则设置为 TriggerRule.ALL_DONE),它将无论其他任务的状态如何都执行(例如,清理资源)。在这种情况下,DAG 将始终运行此任务,并且 DAG 运行将获得此特定任务的状态,因此我们可能会丢失有关失败任务的信息。如果我们想确保带有清理任务的 DAG 在任何任务失败时都会失败,我们需要使用观察者模式。观察者任务是一个如果被触发将始终失败的任务,但只有在任何其他任务失败时才需要触发它。它需要将触发规则设置为 TriggerRule.ONE_FAILED,并且它还需要是 DAG 中所有其他任务的下游任务。因此,如果每个其他任务都通过,则将跳过观察者,但是当出现问题时,将执行并失败观察者任务,从而使 DAG 运行也失败。

注意

请注意,触发规则仅依赖于直接上游(父)任务,例如 TriggerRule.ONE_FAILED 将忽略任何失败(或 upstream_failed)的非参数化任务的直接父任务。

通过一个例子更容易理解这个概念。假设我们有以下 DAG

from datetime import datetime

from airflow import DAG
from airflow.decorators import task
from airflow.exceptions import AirflowException
from airflow.operators.bash import BashOperator
from airflow.utils.trigger_rule import TriggerRule


@task(trigger_rule=TriggerRule.ONE_FAILED, retries=0)
def watcher():
    raise AirflowException("Failing task because one or more upstream tasks failed.")


with DAG(
    dag_id="watcher_example",
    schedule="@once",
    start_date=datetime(2021, 1, 1),
    catchup=False,
) as dag:
    failing_task = BashOperator(task_id="failing_task", bash_command="exit 1", retries=0)
    passing_task = BashOperator(task_id="passing_task", bash_command="echo passing_task")
    teardown = BashOperator(
        task_id="teardown",
        bash_command="echo teardown",
        trigger_rule=TriggerRule.ALL_DONE,
    )

    failing_task >> passing_task >> teardown
    list(dag.tasks) >> watcher()

执行后此 DAG 的可视化表示如下所示

_images/watcher.png

我们有几个任务用于不同的目的

  • failing_task 总是失败,

  • passing_task 总是成功(如果执行),

  • teardown 总是被触发(无论其他任务的状态如何),并且它应该总是成功,

  • watcher 是每个其他任务的下游任务,即当任何任务失败时它将被触发,从而使整个 DAG 运行失败,因为它是一个叶子任务。

重要的是要注意,如果没有 watcher 任务,整个 DAG 运行将获得 success 状态,因为唯一失败的任务不是叶子任务,并且 teardown 任务将以 success 完成。如果我们希望 watcher 监视所有任务的状态,我们需要使其单独依赖于所有任务。因此,如果任何任务失败,我们可以使 DAG 运行失败。请注意,观察者任务的触发规则设置为 "one_failed"。另一方面,如果没有 teardown 任务,则不需要 watcher 任务,因为 failing_task 将将其 failed 状态传播到下游任务 passed_task,并且整个 DAG 运行也将获得 failed 状态。

在集群策略中使用 AirflowClusterPolicySkipDag 异常来跳过特定的 DAG

2.7 版本中的新功能。

Airflow DAG 通常可以通过 git-sync 使用 Git 存储库的特定分支进行部署和更新。但是,当您出于某些操作原因而必须运行多个 Airflow 集群时,维护多个 Git 分支非常麻烦。特别是当您需要定期同步两个单独的分支(如 prodbeta)并使用适当的分支策略时,会遇到一些困难。

  • cherry-pick 对于维护 Git 存储库来说太麻烦了。

  • hard-reset 不是 GitOps 的推荐方式

因此,您可以考虑将多个 Airflow 集群连接到同一个 Git 分支(如 main),并通过不同的环境变量和不同的连接配置来维护它们,但使用相同的 connection_id。如果需要,您还可以在集群策略中引发 AirflowClusterPolicySkipDag 异常,以便仅在特定的 Airflow 部署中将特定的 DAG 加载到 DagBag 中。

def dag_policy(dag: DAG):
    """Skipping the DAG with `only_for_beta` tag."""

    if "only_for_beta" in dag.tags:
        raise AirflowClusterPolicySkipDag(
            f"DAG {dag.dag_id} is not loaded on the production cluster, due to `only_for_beta` tag."
        )

上面的示例展示了 dag_policy 代码片段,用于根据 DAG 的标签跳过 DAG。

降低 DAG 复杂性

虽然 Airflow 擅长处理大量具有许多任务和任务之间依赖关系的 DAG,但是当您有许多复杂的 DAG 时,它们的复杂性可能会影响调度的性能。为了保持 Airflow 实例的高性能和良好利用率,您应该尽可能简化和优化 DAG。您必须记住,DAG 解析过程和创建只是执行 Python 代码,而您有责任使其尽可能高效。没有神奇的秘诀可以使您的 DAG “不那么复杂”——因为这是 Python 代码,DAG 编写者控制着他们代码的复杂性。

没有用于 DAG 复杂性的“指标”,特别是,没有指标可以告诉您您的 DAG 是否“足够简单”。但是,与任何 Python 代码一样,您在优化代码后绝对可以判断您的 DAG 代码是否“更简单”或“更快”。如果要优化 DAG,可以采取以下操作

  • 使您的 DAG 加载更快。这是一个单独的改进建议,可以通过多种方式实现,但这是对调度程序性能影响最大的建议。只要有机会使 DAG 加载更快,如果您的目标是提高性能,就应该这样做。查看顶层 Python 代码以获取有关如何执行此操作的一些提示。另请参阅DAG 加载器测试,了解如何评估您的 DAG 加载时间。

  • 使您的 DAG 生成更简单的结构。每个任务依赖都会为调度和执行增加额外的处理开销。具有简单线性结构的 DAG A -> B -> C 在任务调度中的延迟将比具有深度嵌套的树结构的 DAG(例如,依赖任务的数量呈指数增长)少。如果您可以使您的 DAG 更具线性——在执行的单个点上,运行任务的潜在候选者尽可能少,这可能会提高整体调度性能。

  • 减少每个文件中的 DAG 数量。虽然 Airflow 2 针对在一个文件中包含多个 DAG 的情况进行了优化,但系统的某些部分有时会降低性能,或者比将这些 DAG 分散到许多文件中引入更多的延迟。例如,仅一个文件只能由一个 FileProcessor 解析,就使其可扩展性降低。如果从一个文件生成了许多 DAG,如果您发现 Airflow UI 中反映 DAG 文件中的更改需要很长时间,请考虑拆分它们。

  • 编写高效的 Python 代码。如上所述,每个文件的 DAG 数量较少,以及编写较少的总体代码之间必须取得平衡。创建描述 DAG 的 Python 文件应遵循最佳编程实践,而不应将其视为配置。如果您的 DAG 共享相似的代码,则不应将它们一次又一次地复制到大量几乎相同的源文件中,因为这会导致对相同资源的许多不必要的重复导入。相反,您应该尽量减少所有 DAG 中的重复代码,以便应用程序可以高效运行并且可以轻松调试。请参阅动态 DAG 生成,了解如何创建具有相似代码的多个 DAG。

测试 DAG

Airflow 用户应将 DAG 视为生产级代码,并且 DAG 应具有各种关联的测试,以确保它们产生预期的结果。您可以为 DAG 编写各种各样的测试。让我们看看其中的一些。

DAG 加载器测试

此测试应确保您的 DAG 不包含在加载时引发错误的代码段。用户无需编写额外的代码即可运行此测试。

python your-dag-file.py

运行上述命令而没有任何错误可确保您的 DAG 不包含任何未安装的依赖项、语法错误等。确保在与您的调度程序环境相对应的环境中加载 DAG——具有相同的依赖项、环境变量、DAG 中引用的通用代码。

如果您想尝试优化 DAG 加载时间,这也是一种很好的方法来检查 DAG 在优化后是否加载得更快。只需运行 DAG 并测量其所需的时间,但同样您必须确保 DAG 在相同的依赖项、环境变量、通用代码下运行。

有很多方法可以测量处理时间,其中一种在 Linux 环境中使用内置的 time 命令。请确保连续运行多次以考虑缓存效应。比较优化前后的结果(在相同条件下——使用相同的机器、环境等),以评估优化的影响。

time python airflow/example_dags/example_python_operator.py

结果

real    0m0.699s
user    0m0.590s
sys     0m0.108s

重要的指标是“实际时间”——它告诉您处理 DAG 花费了多长时间。请注意,以这种方式加载文件时,您正在启动一个新的解释器,因此当 Airflow 解析 DAG 时,会有一个初始加载时间。您可以通过运行来评估初始化时间

time python -c ''

结果

real    0m0.073s
user    0m0.037s
sys     0m0.039s

在这种情况下,初始解释器启动时间约为 0.07 秒,约占上面解析 example_python_operator.py 所需时间的 10%,因此示例 DAG 的实际解析时间约为 0.62 秒。

您可以查看 测试 DAG,了解有关如何测试单个运算符的详细信息。

单元测试

单元测试确保您的 DAG 中没有错误的代码。您可以为您的任务和 DAG 编写单元测试。

加载 DAG 的单元测试

import pytest

from airflow.models import DagBag


@pytest.fixture()
def dagbag():
    return DagBag()


def test_dag_loaded(dagbag):
    dag = dagbag.get_dag(dag_id="hello_world")
    assert dagbag.import_errors == {}
    assert dag is not None
    assert len(dag.tasks) == 1

单元测试 DAG 结构:这是一个示例测试,希望根据 dict 对象验证代码生成的 DAG 的结构

def assert_dag_dict_equal(source, dag):
    assert dag.task_dict.keys() == source.keys()
    for task_id, downstream_list in source.items():
        assert dag.has_task(task_id)
        task = dag.get_task(task_id)
        assert task.downstream_task_ids == set(downstream_list)


def test_dag():
    assert_dag_dict_equal(
        {
            "DummyInstruction_0": ["DummyInstruction_1"],
            "DummyInstruction_1": ["DummyInstruction_2"],
            "DummyInstruction_2": ["DummyInstruction_3"],
            "DummyInstruction_3": [],
        },
        dag,
    )

自定义运算符的单元测试

import datetime

import pendulum
import pytest

from airflow import DAG
from airflow.utils.state import DagRunState, TaskInstanceState
from airflow.utils.types import DagRunType

DATA_INTERVAL_START = pendulum.datetime(2021, 9, 13, tz="UTC")
DATA_INTERVAL_END = DATA_INTERVAL_START + datetime.timedelta(days=1)

TEST_DAG_ID = "my_custom_operator_dag"
TEST_TASK_ID = "my_custom_operator_task"


@pytest.fixture()
def dag():
    with DAG(
        dag_id=TEST_DAG_ID,
        schedule="@daily",
        start_date=DATA_INTERVAL_START,
    ) as dag:
        MyCustomOperator(
            task_id=TEST_TASK_ID,
            prefix="s3://bucket/some/prefix",
        )
    return dag


def test_my_custom_operator_execute_no_trigger(dag):
    dagrun = dag.create_dagrun(
        state=DagRunState.RUNNING,
        execution_date=DATA_INTERVAL_START,
        data_interval=(DATA_INTERVAL_START, DATA_INTERVAL_END),
        start_date=DATA_INTERVAL_END,
        run_type=DagRunType.MANUAL,
    )
    ti = dagrun.get_task_instance(task_id=TEST_TASK_ID)
    ti.task = dag.get_task(task_id=TEST_TASK_ID)
    ti.run(ignore_ti_state=True)
    assert ti.state == TaskInstanceState.SUCCESS
    # Assert something related to tasks results.

自检

您还可以在 DAG 中实现检查,以确保任务产生预期的结果。例如,如果您有一个将数据推送到 S3 的任务,则可以在下一个任务中实现一个检查。例如,检查可以确保在 S3 中创建分区,并执行一些简单的检查以确定数据是否正确。

同样,如果您有一个在 Kubernetes 或 Mesos 中启动微服务的任务,您应该使用 airflow.providers.http.sensors.http.HttpSensor 检查服务是否已启动。

task = PushToS3(...)
check = S3KeySensor(
    task_id="check_parquet_exists",
    bucket_key="s3://bucket/key/foo.parquet",
    poke_interval=0,
    timeout=0,
)
task >> check

暂存环境

如果可能,请保留一个暂存环境,以便在部署到生产环境之前测试完整的 DAG 运行。确保您的 DAG 已参数化以更改变量,例如 S3 操作的输出路径或用于读取配置的数据库。不要在 DAG 内部硬编码值,然后根据环境手动更改它们。

您可以使用环境变量来参数化 DAG。

import os

dest = os.environ.get("MY_DAG_DEST_PATH", "s3://default-target/path/")

模拟变量和连接

当您为使用变量或连接的代码编写测试时,您必须确保在运行测试时它们存在。显而易见的解决方案是将这些对象保存到数据库中,以便在代码执行时可以读取它们。但是,向数据库读取和写入对象会增加额外的时间开销。为了加快测试执行速度,值得在不将这些对象保存到数据库的情况下模拟这些对象的存在。为此,您可以使用模拟 os.environ 的环境变量,使用 unittest.mock.patch.dict()

对于变量,请使用 AIRFLOW_VAR_{KEY}

with mock.patch.dict("os.environ", AIRFLOW_VAR_KEY="env-value"):
    assert "env-value" == Variable.get("key")

对于连接,请使用 AIRFLOW_CONN_{CONN_ID}

conn = Connection(
    conn_type="gcpssh",
    login="cat",
    host="conn-host",
)
conn_uri = conn.get_uri()
with mock.patch.dict("os.environ", AIRFLOW_CONN_MY_CONN=conn_uri):
    assert "cat" == Connection.get_connection_from_secrets("my_conn").login

元数据数据库维护

随着更多 DAG 和任务运行以及事件日志的累积,元数据数据库的存储占用空间将随着时间的推移而增加。

您可以使用 Airflow CLI 使用命令 airflow db clean 清除旧数据。

有关更多详细信息,请参阅 db clean usage

升级和降级

备份数据库

在进行任何修改数据库的操作之前,备份元数据数据库始终是一个明智的主意。

禁用调度程序

您可能会考虑在执行此类维护时禁用 Airflow 集群。

一种方法是将参数 [scheduler] > use_job_schedule 设置为 False 并等待任何正在运行的 DAG 完成;在此之后,除非外部触发,否则不会创建新的 DAG 运行。

一个更好的方法(尽管稍微需要手动操作)是使用 dags pause 命令。您需要记录在开始此操作之前已暂停的 DAG,以便知道在维护完成后取消暂停哪些 DAG。首先运行 airflow dags list 并存储未暂停的 DAG 列表。然后使用此列表在维护之前对每个 DAG 运行 dags pause,并在维护之后运行 dags unpause。这样做的好处是,您可以在升级后尝试仅取消暂停一个或两个 DAG(可能是专用的测试 DAG),以确保在重新启用所有内容之前一切正常。

添加“集成测试”DAG

添加几个“集成测试”DAG 可能会有所帮助,这些 DAG 使用您的生态系统中的所有常见服务(例如 S3、Snowflake、Vault),但使用测试资源或“开发”帐户。这些测试 DAG 可以是您在升级后首先启用的 DAG,因为如果它们失败了,这并不重要,您可以恢复到备份而不会产生负面后果。但是,如果它们成功,它们应该证明您的集群能够使用您需要使用的库和服务运行任务。

例如,如果您使用外部密钥后端,请确保您有一个检索连接的任务。如果您使用 KubernetesPodOperator,请添加一个运行 sleep 30; echo "hello" 的任务。如果您需要写入 S3,请在测试任务中执行此操作。如果您需要访问数据库,请添加一个执行 select 1 从服务器获取数据的任务。

升级前修剪数据

一些数据库迁移可能非常耗时。如果您的元数据数据库非常大,请考虑在执行升级之前使用 db clean 命令修剪一些旧数据。谨慎使用。

处理冲突/复杂的 Python 依赖项

Airflow 有许多 Python 依赖项,有时 Airflow 依赖项与您的任务代码期望的依赖项冲突。由于默认情况下,Airflow 环境只是一组 Python 依赖项和一个 Python 环境,因此有时也会出现某些任务需要的依赖项与其它任务不同,并且这些任务之间的依赖项基本上存在冲突的情况。

如果您使用预定义的 Airflow 运算符与外部服务进行通信,则没有太多选择,但通常这些运算符将具有与基本 Airflow 依赖项不冲突的依赖项。Airflow 使用约束机制,这意味着您有一组“固定”的依赖项,社区保证可以使用这些依赖项安装 Airflow(包括所有社区提供商),而不会触发冲突。但是,您可以独立升级提供商,并且它们的约束不会限制您,因此发生冲突依赖项的可能性较低(您仍然需要测试这些依赖项)。因此,当您使用预定义运算符时,您几乎不会遇到冲突依赖项的问题。

但是,当您以更“现代的方式”接近 Airflow 时,您使用 TaskFlow API,并且您的大多数运算符都是使用自定义 Python 代码编写的,或者当您想要编写自己的自定义运算符时,您可能会遇到您自定义代码所需的依赖项与 Airflow 的依赖项冲突,甚至您的几个自定义运算符的依赖项之间也存在冲突。

可以采用多种策略来缓解此问题。虽然处理自定义运算符中的依赖项冲突很困难,但当使用 airflow.operators.python.PythonVirtualenvOperatorairflow.operators.python.ExternalPythonOperator 时,实际上要容易得多 - 可以直接使用经典的“运算符”方法,或者如果您使用 TaskFlow,则可以使用 @task.virtualenv@task.external_python 修饰器来修饰任务。

让我们从最容易实现(有一些限制和开销)的策略开始,然后逐步介绍那些需要更改 Airflow 部署的策略。

使用 PythonVirtualenvOperator

这是最简单使用且限制最多的策略。airflow.operators.python.PythonVirtualenvOperator 允许您动态创建一个虚拟环境,您的 Python 可调用函数将在其中执行。在 使用 TaskFlow 中描述的现代 TaskFlow 方法中,也可以通过使用 @task.virtualenv 修饰器来修饰您的可调用对象来实现(推荐使用该运算符的方法)。每个 airflow.operators.python.PythonVirtualenvOperator 任务都可以有其自己的独立的 Python 虚拟环境(每次运行任务时动态创建),并且可以指定需要安装的细粒度要求,以便执行该任务。

该运算符负责:

  • 基于您的环境创建虚拟环境

  • 序列化您的 Python 可调用对象并将其传递给虚拟环境 Python 解释器执行

  • 执行该可调用对象并检索其结果,并在指定时通过 xcom 推送

该运算符的优点是:

  • 无需预先准备 venv。它将在任务运行之前动态创建,并在任务完成后删除,因此除了在您的 airflow 依赖项中拥有 virtualenv 包之外,没有什么特别的,可以利用多个虚拟环境

  • 您可以在同一工作器上运行具有不同依赖项集的不同任务 - 因此可以重复使用内存资源(但请参阅下文关于创建 venv 所涉及的 CPU 开销)。

  • 在较大的安装中,DAG 作者无需要求任何人为您创建 venv。作为 DAG 作者,您只需安装 virtualenv 依赖项,就可以根据自己的需要指定和修改环境。

  • 部署要求没有变化 - 无论您使用本地虚拟环境、Docker 还是 Kubernetes,任务都将正常工作,而无需向部署添加任何内容。

  • 无需了解更多关于容器、Kubernetes 作为 DAG 作者的信息。只需了解 Python 要求即可通过这种方式编写 DAG。

此运算符引入了一些限制和开销:

  • 您的 Python 可调用对象必须是可序列化的。许多 Python 对象无法使用标准的 pickle 库进行序列化。您可以通过使用 dill 库来缓解其中一些限制,但即使该库也无法解决所有序列化限制。

  • Airflow 环境中不可用的所有依赖项必须在您使用的可调用对象中本地导入,并且 DAG 的顶级 Python 代码不应导入/使用这些库。

  • 虚拟环境在同一操作系统中运行,因此它们不能具有冲突的系统级依赖项(aptyum 可安装的软件包)。只有 Python 依赖项可以在这些环境中独立安装。

  • 该运算符会增加运行每个任务的 CPU、网络和经过时间的开销 - Airflow 必须为每个任务从头开始重新创建虚拟环境。

  • 工作器需要访问 PyPI 或私有存储库才能安装依赖项

  • 动态创建虚拟环境容易出现瞬时故障(例如,当您的存储库不可用或在访问存储库时出现网络问题)。

  • 很容易陷入“过于”动态的环境 - 因为您安装的依赖项可能会升级,并且它们的传递依赖项可能会获得独立升级,您最终可能会遇到您的任务因有人发布新版本的依赖项而停止工作的情况,或者您可能会成为“供应链”攻击的受害者,其中新版本的依赖项可能变得恶意。

  • 任务仅通过在不同的环境中运行来彼此隔离。这使得运行任务仍然会相互干扰 - 例如,在同一工作器上执行的后续任务可能会受到先前创建/修改文件的任务的影响。

您可以在 Taskflow Virtualenv 示例中看到使用 airflow.operators.python.PythonVirtualenvOperator 的详细示例。

使用 ExternalPythonOperator

版本 2.4 中的新增功能。

稍微复杂一些,但开销、安全性、稳定性问题要少得多的是使用 airflow.operators.python.ExternalPythonOperator`。在 使用 TaskFlow 中描述的现代 TaskFlow 方法中,也可以通过使用 @task.external_python 修饰器来修饰您的可调用对象来实现(推荐使用该运算符的方法)。但是,它要求您拥有一个预先存在、不可变的 Python 环境,该环境已预先准备好。与 airflow.operators.python.PythonVirtualenvOperator 不同,您不能向这种预先存在的环境中添加新的依赖项。您需要的所有依赖项都应在您的环境中预先添加,并且在您的 Airflow 在分布式环境中运行时,在所有工作器中都可用。

这样可以避免重新创建虚拟环境的开销和问题,但它们必须与 Airflow 安装一起准备和部署。通常需要管理 Airflow 安装的人员参与,在较大的安装中,这些人通常与 DAG 作者(DevOps/系统管理员)不同。

可以通过多种方式准备这些虚拟环境 - 如果您使用 LocalExecutor,它们只需安装在运行调度程序的计算机上即可,如果您使用分布式 Celery virtualenv 安装,则应该有一个管道在多台计算机上安装这些虚拟环境,最后,如果您使用 Docker 镜像(例如通过 Kubernetes),则应将虚拟环境创建添加到自定义镜像构建管道中。

该运算符的优点是:

  • 运行任务时没有设置开销。当您开始运行任务时,虚拟环境已准备就绪。

  • 您可以在同一工作节点上运行具有不同依赖项集的任务,从而复用所有资源。

  • 工作节点无需访问 PyPI 或私有仓库。减少了由网络导致的瞬态错误的可能性。

  • 依赖项可以由管理员和您的安全团队预先审查,不会动态添加意外的新代码。这有利于安全性和稳定性。

  • 对您的部署影响有限 - 您无需切换到 Docker 容器或 Kubernetes 即可充分利用该操作符。

  • 作为 DAG 作者,无需学习更多关于容器或 Kubernetes 的知识。只需 Python 和 requirements 的知识即可编写 DAG。

缺点

  • 您的环境需要预先准备好虚拟环境。这通常意味着您无法即时更改它,添加新的或更改 requirements 至少需要重新部署 Airflow,并且在处理新版本时迭代时间可能会更长。

  • 您的 Python 可调用对象必须是可序列化的。许多 Python 对象无法使用标准的 pickle 库进行序列化。您可以通过使用 dill 库来缓解其中一些限制,但即使该库也无法解决所有序列化限制。

  • 所有在 Airflow 环境中不可用的依赖项都必须在您使用的可调用对象中本地导入,并且 DAG 的顶层 Python 代码不应导入/使用这些库。

  • 虚拟环境在同一操作系统中运行,因此它们不能有冲突的系统级依赖项(aptyum 可安装的软件包)。只有 Python 依赖项可以独立安装在这些环境中。

  • 任务之间仅通过在不同的环境中运行进行隔离。这使得正在运行的任务仍然可能相互干扰 - 例如,在同一工作节点上执行的后续任务可能会受到之前任务创建/修改文件等的影响。

您可以将 PythonVirtualenvOperatorExternalPythonOperator 视为对应物 - 它们使从开发阶段平稳过渡到生产阶段更加容易。作为 DAG 作者,您通常会迭代依赖项并使用 PythonVirtualenvOperator 开发 DAG(因此使用 @task.virtualenv 装饰器装饰您的任务),而在迭代和更改后,您可能希望将其更改为生产环境,切换到 ExternalPythonOperator(和 @task.external_python),在您的 DevOps/系统管理员团队将您的新依赖项部署到生产环境中预先存在的 virtualenv 之后。这样做的好处是,您可以随时切换回装饰器,并继续使用 PythonVirtualenvOperator “动态”地进行开发。

您可以在 airflow.operators.python.ExternalPythonOperatorTaskflow External Python 示例中查看使用方法的详细示例。

使用 DockerOperator 或 Kubernetes Pod Operator

另一种策略是使用 airflow.providers.docker.operators.docker.DockerOperator airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator。这些操作符要求 Airflow 可以访问 Docker 引擎或 Kubernetes 集群。

与 Python 操作符的情况类似,如果您想使用这些操作符执行可调用的 Python 代码,则 taskflow 装饰器会很方便。

但是,这涉及到更多内容 - 如果您想使用这种方法,则需要了解 Docker/Kubernetes Pod 的工作原理,但是任务之间是完全隔离的,并且您甚至不限于运行 Python 代码。您可以使用任何您想要的编程语言编写任务。此外,您的依赖项完全独立于 Airflow 的依赖项(包括系统级依赖项),因此,如果您的任务需要非常不同的环境,这是最佳选择。

2.2 版本新增功能。

从 Airflow 2.2 版本开始,您可以使用 @task.docker 装饰器使用 DockerOperator 运行您的函数。

版本 2.4 中的新增功能。

从 Airflow 2.2 版本开始,您可以使用 @task.kubernetes 装饰器使用 KubernetesPodOperator 运行您的函数。

使用这些操作符的好处是

  • 您可以运行具有不同 Python 和系统级依赖项集的任务,甚至可以使用完全不同的语言或不同的处理器架构(x86 与 arm)编写的任务。

  • 用于运行任务的环境享有容器的优化和不可变性,其中一组类似的依赖项可以有效地重用镜像的许多缓存层,因此该环境针对您拥有多个相似但不同的环境的情况进行了优化。

  • 依赖项可以由管理员和您的安全团队预先审查,不会动态添加意外的新代码。这有利于安全性和稳定性。

  • 任务之间完全隔离。它们不能以其他方式相互影响,只能使用标准的 Airflow XCom 机制。

缺点

  • 启动任务时会产生开销。通常没有动态创建虚拟环境时那么大,但仍然很显著(尤其是对于 KubernetesPodOperator)。

  • 在 TaskFlow 装饰器的情况下,需要序列化整个要调用的方法并将其发送到 Docker 容器或 Kubernetes Pod,并且对方法的大小存在系统级别的限制。在远程端序列化、发送和最终反序列化方法也会增加开销。

  • 需要多个进程会带来资源开销。在运行这两个操作符的情况下运行任务至少需要两个进程 - 一个进程(在 Docker 容器或 Kubernetes Pod 中运行)执行任务,以及 Airflow 工作节点中的一个监督进程,该进程将作业提交到 Docker/Kubernetes 并监控执行情况。

  • 您的环境需要预先准备好容器镜像。这通常意味着您无法即时更改它们。添加系统依赖项、修改或更改 Python requirements 需要重建和发布镜像(通常在您的私有注册表中)。当您处理新的依赖项时,迭代时间通常会更长,并且需要正在迭代的开发人员在迭代期间构建和使用他们自己的镜像(如果他们更改依赖项)。适当的部署管道对于能够可靠地维护您的部署至关重要。

  • 如果想通过装饰器运行您的 Python 可调用对象,则该对象必须是可序列化的,在这种情况下,所有在 Airflow 环境中不可用的依赖项都必须在您使用的可调用对象中本地导入,并且 DAG 的顶层 Python 代码不应导入/使用这些库。

  • 您需要了解更多关于 Docker 容器或 Kubernetes 工作原理的详细信息。这两者提供的抽象是“有漏洞的”,因此您需要更多地了解资源、网络、容器等,才能编写使用这些操作符的 DAG。

您可以在 Taskflow Docker 示例中查看使用 airflow.operators.providers.Docker 的详细示例,并在airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator Taskflow Kubernetes 示例中查看使用 airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator的详细示例。

使用多个 Docker 镜像和 Celery 队列

有一种可能性(尽管需要深入了解 Airflow 部署)可以使用多个独立的 Docker 镜像运行 Airflow 任务。这可以通过将不同的任务分配给不同的队列,并配置 Celery 工作节点对不同的队列使用不同的镜像来实现。然而,这(至少目前)需要大量的手动部署配置以及对 Airflow、Celery 和 Kubernetes 工作原理的深入了解。此外,这会为运行任务引入相当多的开销 - 资源复用的机会更少,并且在不影响性能和稳定性的情况下,很难对这种部署进行资源成本的微调。

一种可能使其更有用的方法是 AIP-46 Airflow 任务和 DAG 解析的运行时隔离AIP-43 DAG 处理器分离的完成。在这些实现之前,使用这种方法的好处非常少,不建议使用。

然而,当这些 AIP 实现后,这将为多租户方法开启可能性,多个团队将能够拥有完全隔离的依赖项集,这些依赖项将用于 DAG 的整个生命周期 - 从解析到执行。

这个条目有用吗?