使用 TaskFlow API 的 Pythonic Dags

在第一个教程中,你使用传统的算子(如 BashOperator)构建了第一个 Airflow Dag。现在让我们看看一种更现代、更符合 Python 风格的编写工作流的方式——TaskFlow API(在 Airflow 2.0 中引入)。

TaskFlow API 旨在让你的代码更简洁、清晰、更易维护。你编写普通的 Python 函数并进行装饰,Airflow 会处理其余工作——包括任务创建、依赖连线以及任务之间的数据传递。

在本教程中,我们将使用 TaskFlow API 创建一个简单的 ETL 流水线——提取 → 转换 → 加载。让我们深入了解吧!

全局概览:TaskFlow 流水线

下面是使用 TaskFlow 实现的完整流水线。即使有些内容看起来陌生也别担心——我们会一步一步拆解。

airflow/example_dags/tutorial_taskflow_api.py[source]


import json

import pendulum

from airflow.sdk import dag, task
@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)
def tutorial_taskflow_api():
    """
    ### TaskFlow API Tutorial Documentation
    This is a simple data pipeline example which demonstrates the use of
    the TaskFlow API using three simple tasks for Extract, Transform, and Load.
    Documentation that goes along with the Airflow TaskFlow API tutorial is
    located
    [here](https://airflow.org.cn/docs/apache-airflow/stable/tutorial_taskflow_api.html)
    """
    @task()
    def extract():
        """
        #### Extract task
        A simple Extract task to get data ready for the rest of the data
        pipeline. In this case, getting data is simulated by reading from a
        hardcoded JSON string.
        """
        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'

        order_data_dict = json.loads(data_string)
        return order_data_dict
    @task(multiple_outputs=True)
    def transform(order_data_dict: dict):
        """
        #### Transform task
        A simple Transform task which takes in the collection of order data and
        computes the total order value.
        """
        total_order_value = 0

        for value in order_data_dict.values():
            total_order_value += value

        return {"total_order_value": total_order_value}
    @task()
    def load(total_order_value: float):
        """
        #### Load task
        A simple Load task which takes in the result of the Transform task and
        instead of saving it to end user review, just prints it out.
        """

        print(f"Total order value is: {total_order_value:.2f}")
    order_data = extract()
    order_summary = transform(order_data)
    load(order_summary["total_order_value"])
tutorial_taskflow_api()

步骤 1:定义 Dag

和之前一样,Dag 是一个由 Airflow 加载和解析的 Python 脚本。但这次我们使用 @dag 装饰器来定义它。

airflow/example_dags/tutorial_taskflow_api.py[source]

@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)
def tutorial_taskflow_api():
    """
    ### TaskFlow API Tutorial Documentation
    This is a simple data pipeline example which demonstrates the use of
    the TaskFlow API using three simple tasks for Extract, Transform, and Load.
    Documentation that goes along with the Airflow TaskFlow API tutorial is
    located
    [here](https://airflow.org.cn/docs/apache-airflow/stable/tutorial_taskflow_api.html)
    """

为了让 Airflow 能发现此 Dag,我们可以调用被 @dag 装饰的 Python 函数。

airflow/example_dags/tutorial_taskflow_api.py[source]

tutorial_taskflow_api()

自 2.4 版起更改:如果你使用 @dag 装饰器或在 with 块中定义 Dag,就不需要再将其赋值给全局变量。Airflow 会自动发现它。

你可以在 Airflow UI 中可视化你的 Dag!Dag 加载后,前往 Graph View(图形视图)查看任务之间的连接。

步骤 2:使用 @task 编写任务

使用 TaskFlow 时,每个任务就是普通的 Python 函数。你可以使用 @task 装饰器将其转换为 Airflow 可调度运行的任务。这里是 extract 任务。

airflow/example_dags/tutorial_taskflow_api.py[source]

@task()
def extract():
    """
    #### Extract task
    A simple Extract task to get data ready for the rest of the data
    pipeline. In this case, getting data is simulated by reading from a
    hardcoded JSON string.
    """
    data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'

    order_data_dict = json.loads(data_string)
    return order_data_dict


函数的返回值会自动传递给下一个任务——不需要手动使用 XComs。底层上,TaskFlow 使用 XComs 自动管理数据传递,抽象掉了之前手动管理 XCom 的复杂性。你将在后面以相同模式定义 transformload 任务。

注意上面使用了 @task(multiple_outputs=True) —— 这告诉 Airflow 函数返回的是一个字典,需拆分为多个独立的 XCom。返回字典中的每个键都会成为各自的 XCom 条目,便于在下游任务中引用特定值。如果省略 multiple_outputs=True,整个字典会作为单个 XCom 存储,需要整体访问。

步骤 3:构建流水线

任务定义好后,你只需像调用普通 Python 函数一样调用它们即可构建流水线。Airflow 通过这种函数调用方式设置任务依赖并管理数据传递。

airflow/example_dags/tutorial_taskflow_api.py[source]

order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])

就这些!Airflow 仅凭这段代码就能调度和编排你的流水线。

运行你的 Dag

启用并触发你的 Dag

  1. 前往 Airflow UI。

  2. 在列表中找到你的 Dag,点击切换按钮以启用它。

  3. 你可以点击 “Trigger Dag” 按钮手动触发,或等待其按调度运行。

幕后究竟发生了什么?

如果你使用过 Airflow 1.x,可能会觉得这像是魔法。让我们比较一下底层到底发生了什么。

“旧方式”:手动连线和 XComs

在 TaskFlow API 之前,你必须使用像 PythonOperator 之类的算子,并手动通过 XComs 在任务之间传递数据。

下面是使用传统方法实现相同 Dag 的示例

import json
import pendulum
from airflow.sdk import DAG
from airflow.providers.standard.operators.python import PythonOperator


def extract():
    # Old way: simulate extracting data from a JSON string
    data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
    return json.loads(data_string)


def transform(ti):
    # Old way: manually pull from XCom
    order_data_dict = ti.xcom_pull(task_ids="extract")
    total_order_value = sum(order_data_dict.values())
    return {"total_order_value": total_order_value}


def load(ti):
    # Old way: manually pull from XCom
    total = ti.xcom_pull(task_ids="transform")["total_order_value"]
    print(f"Total order value is: {total:.2f}")


with DAG(
    dag_id="legacy_etl_pipeline",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
) as dag:
    extract_task = PythonOperator(task_id="extract", python_callable=extract)
    transform_task = PythonOperator(task_id="transform", python_callable=transform)
    load_task = PythonOperator(task_id="load", python_callable=load)

    extract_task >> transform_task >> load_task

注意

该版本可产生与 TaskFlow API 示例相同的结果,但需要显式管理 XComs 和任务依赖。

TaskFlow 的方式

使用 TaskFlow 时,所有这些都会自动处理。

airflow/example_dags/tutorial_taskflow_api.py[source]


import json

import pendulum

from airflow.sdk import dag, task
@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)
def tutorial_taskflow_api():
    """
    ### TaskFlow API Tutorial Documentation
    This is a simple data pipeline example which demonstrates the use of
    the TaskFlow API using three simple tasks for Extract, Transform, and Load.
    Documentation that goes along with the Airflow TaskFlow API tutorial is
    located
    [here](https://airflow.org.cn/docs/apache-airflow/stable/tutorial_taskflow_api.html)
    """
    @task()
    def extract():
        """
        #### Extract task
        A simple Extract task to get data ready for the rest of the data
        pipeline. In this case, getting data is simulated by reading from a
        hardcoded JSON string.
        """
        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'

        order_data_dict = json.loads(data_string)
        return order_data_dict
    @task(multiple_outputs=True)
    def transform(order_data_dict: dict):
        """
        #### Transform task
        A simple Transform task which takes in the collection of order data and
        computes the total order value.
        """
        total_order_value = 0

        for value in order_data_dict.values():
            total_order_value += value

        return {"total_order_value": total_order_value}
    @task()
    def load(total_order_value: float):
        """
        #### Load task
        A simple Load task which takes in the result of the Transform task and
        instead of saving it to end user review, just prints it out.
        """

        print(f"Total order value is: {total_order_value:.2f}")
    order_data = extract()
    order_summary = transform(order_data)
    load(order_summary["total_order_value"])
tutorial_taskflow_api()


Airflow 仍然使用 XComs 并构建依赖图——只是被抽象掉了,这样你可以专注于业务逻辑。

如何使用 XComs

TaskFlow 的返回值会自动存储为 XComs。可以在 UI 的 “XCom” 选项卡中查看这些值。对于传统算子,仍然可以手动调用 xcom_pull()

错误处理与重试

你可以使用装饰器轻松为任务配置重试。例如,可以在任务装饰器中直接设置最大重试次数。

@task(retries=3)
def my_task(): ...

这有助于确保短暂的失败不会导致任务失败。

任务参数化

你可以在多个 Dag 中复用已装饰的任务,并覆盖诸如 task_idretries 等参数。

start = add_task.override(task_id="start")(1, 2)

甚至可以从共享模块导入已装饰的任务。

接下来可以探索的内容

干得好!你已经使用 TaskFlow API 编写了第一个流水线。想了解下一步该怎么做吗?

  • 向 Dag 添加新任务——例如过滤或校验步骤

  • 修改返回值并传递多个输出

  • 通过 .override(task_id="…") 探索重试和覆盖

  • 打开 Airflow UI 并检查任务之间的数据流动,包括任务日志和依赖关系

另请参阅

高级 TaskFlow 模式

当你熟悉基础后,这里有几种强大的技巧可以尝试。

复用已装饰的任务

你可以在多个 Dag 或 Dag 运行之间复用装饰任务。这对通用逻辑(如可复用的工具或共享业务规则)特别有用。使用 .override() 可自定义任务元数据,如 task_idretries

start = add_task.override(task_id="start")(1, 2)

甚至可以从共享模块导入已装饰的任务。

处理冲突的依赖关系

有时任务需要与 Dag 其他部分不同的 Python 依赖——例如专门的库或系统级软件包。TaskFlow 支持多种执行环境来隔离这些依赖。

动态创建的 Virtualenv

在任务运行时创建临时 virtualenv。适合实验性或动态任务,但可能会有冷启动开销。

/opt/airflow/providers/standard/src/airflow/providers/standard/example_dags/example_python_decorator.py

@task.virtualenv(
    task_id="virtualenv_python", requirements=["colorama==0.4.0"], system_site_packages=False
)
def callable_virtualenv():
    """
    Example function that will be performed in a virtual environment.

    Importing at the module level ensures that it will not attempt to import the
    library before it is installed.
    """
    from time import sleep

    from colorama import Back, Fore, Style

    print(Fore.RED + "some red text")
    print(Back.GREEN + "and with a green background")
    print(Style.DIM + "and in dim text")
    print(Style.RESET_ALL)
    for _ in range(4):
        print(Style.DIM + "Please wait...", flush=True)
        sleep(1)
    print("Finished")

virtualenv_task = callable_virtualenv()

外部 Python 环境

使用预装的 Python 解释器执行任务——适用于一致的环境或共享 virtualenv。

/opt/airflow/providers/standard/src/airflow/providers/standard/example_dags/example_python_decorator.py

@task.external_python(task_id="external_python", python=PATH_TO_PYTHON_BINARY)
def callable_external_python():
    """
    Example function that will be performed in a virtual environment.

    Importing at the module level ensures that it will not attempt to import the
    library before it is installed.
    """
    import sys
    from time import sleep

    print(f"Running task via {sys.executable}")
    print("Sleeping")
    for _ in range(4):
        print("Please wait...", flush=True)
        sleep(1)
    print("Finished")

external_python_task = callable_external_python()

Docker 环境

在 Docker 容器中运行任务。适合打包任务所需的一切——但要求工作节点上已安装 Docker。

/opt/airflow/providers/docker/tests/system/docker/example_taskflow_api_docker_virtualenv.py

@task.docker(image="python:3.9-slim-bookworm", multiple_outputs=True)
def transform(order_data_dict: dict):
    """
    #### Transform task
    A simple Transform task which takes in the collection of order data and
    computes the total order value.
    """
    total_order_value = 0

    for value in order_data_dict.values():
        total_order_value += value

    return {"total_order_value": total_order_value}


注意

需要 Airflow 2.2 以及 Docker provider。

KubernetesPodOperator

在 Kubernetes pod 中运行任务,完全与主 Airflow 环境隔离。适合大任务或需要自定义运行时的任务。

/opt/airflow/providers/cncf/kubernetes/tests/system/cncf/kubernetes/example_kubernetes_decorator.py

@task.kubernetes(
    image="python:3.9-slim-buster",
    name="k8s_test",
    namespace="default",
    in_cluster=False,
    config_file="/path/to/.kube/config",
)
def execute_in_k8s_pod():
    import time

    print("Hello from k8s pod")
    time.sleep(2)

@task.kubernetes(image="python:3.9-slim-buster", namespace="default", in_cluster=False)
def print_pattern():
    n = 5
    for i in range(n):
        # inner loop to handle number of columns
        # values changing acc. to outer loop
        for _ in range(i + 1):
            # printing stars
            print("* ", end="")

        # ending line after each row
        print("\r")

execute_in_k8s_pod_instance = execute_in_k8s_pod()
print_pattern_instance = print_pattern()

execute_in_k8s_pod_instance >> print_pattern_instance

注意

需要 Airflow 2.4 以及 Kubernetes provider。

使用传感器

使用 @task.sensor 通过 Python 函数构建轻量、可复用的传感器。这两种模式都支持 poke 和 reschedule。

/opt/airflow/providers/standard/src/airflow/providers/standard/example_dags/example_sensor_decorator.py


import pendulum

from airflow.sdk import PokeReturnValue, dag, task
@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)
def example_sensor_decorator():
    # Using a sensor operator to wait for the upstream data to be ready.
    @task.sensor(poke_interval=60, timeout=3600, mode="reschedule")
    def wait_for_upstream() -> PokeReturnValue:
        return PokeReturnValue(is_done=True, xcom_value="xcom_value")
    @task
    def dummy_operator() -> None:
        pass
    wait_for_upstream() >> dummy_operator()
tutorial_etl_dag = example_sensor_decorator()

与传统任务混合使用

你可以将装饰任务与传统算子结合使用。这在使用社区提供者或逐步迁移到 TaskFlow 时很有帮助。

可以使用 >> 链接 TaskFlow 与传统任务,或通过 .output 属性传递数据。

TaskFlow 中的模板化

与传统任务一样,装饰的 TaskFlow 函数支持模板化参数——包括从文件加载内容或使用运行时参数。

在运行你的可调用对象时,Airflow 会传入一组关键字参数,可在函数中使用。这些 kwargs 与你在 Jinja 模板中可使用的变量完全对应。为实现此功能,你可以将希望在函数中获取的上下文键作为关键字参数添加。

例如,下方代码块中的可调用对象会获取上下文变量 tinext_ds 的值。

@task
def my_python_callable(*, ti, next_ds):
    pass

你也可以选择使用 **kwargs 接收完整上下文。需注意,这可能会带来轻微的性能损失,因为 Airflow 需要展开可能包含大量不必要信息的完整上下文。因此更推荐使用显式参数,如前段所示。

@task
def my_python_callable(**kwargs):
    ti = kwargs["ti"]
    next_ds = kwargs["next_ds"]

此外,有时你可能想在深层调用栈中访问上下文,但不想从任务可调用对象传递上下文变量。仍可通过 get_current_context 方法获取执行上下文。

from airflow.sdk import get_current_context


def some_function_in_your_library():
    context = get_current_context()
    ti = context["ti"]

传递给装饰函数的参数会自动进行模板化。你也可以使用 templates_exts 对文件进行模板化。

@task(templates_exts=[".sql"])
def read_sql(sql): ...

条件执行

使用 @task.run_if()@task.skip_if() 根据运行时的动态条件控制任务是否执行——无需修改 Dag 结构。

@task.run_if(lambda ctx: ctx["task_instance"].task_id == "run")
@task.bash()
def echo():
    return "echo 'run'"

接下来会怎样

既然你已经了解如何使用 TaskFlow API 构建简洁、可维护的 Dag,下面是一些不错的后续步骤。

此条目是否有帮助?