使用 TaskFlow API 编写更 Pythonic 的 DAG¶
在第一个教程中,你使用 PythonOperator
等传统 Operator 构建了第一个 Airflow DAG。现在让我们看看使用 TaskFlow API(Airflow 2.0 引入)编写工作流的更现代、更 Pythonic 的方法。
TaskFlow API 旨在让你的代码更简洁、更清晰、更易于维护。你只需编写普通的 Python 函数,用装饰器修饰它们,剩下的交给 Airflow 处理——包括任务创建、依赖连接和任务之间的数据传递。
在本教程中,我们将使用 TaskFlow API 创建一个简单的 ETL(提取 → 转换 → 加载)流水线。让我们开始吧!
整体概览:TaskFlow 流水线¶
下图展示了使用 TaskFlow 构建的完整流水线。如果有些地方看起来不熟悉,请不要担心——我们会一步一步详细讲解。
src/airflow/example_dags/tutorial_taskflow_api.py
import json
import pendulum
from airflow.sdk import dag, task
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
)
def tutorial_taskflow_api():
"""
### TaskFlow API Tutorial Documentation
This is a simple data pipeline example which demonstrates the use of
the TaskFlow API using three simple tasks for Extract, Transform, and Load.
Documentation that goes along with the Airflow TaskFlow API tutorial is
located
[here](https://airflow.org.cn/docs/apache-airflow/stable/tutorial_taskflow_api.html)
"""
@task()
def extract():
"""
#### Extract task
A simple Extract task to get data ready for the rest of the data
pipeline. In this case, getting data is simulated by reading from a
hardcoded JSON string.
"""
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
order_data_dict = json.loads(data_string)
return order_data_dict
@task(multiple_outputs=True)
def transform(order_data_dict: dict):
"""
#### Transform task
A simple Transform task which takes in the collection of order data and
computes the total order value.
"""
total_order_value = 0
for value in order_data_dict.values():
total_order_value += value
return {"total_order_value": total_order_value}
@task()
def load(total_order_value: float):
"""
#### Load task
A simple Load task which takes in the result of the Transform task and
instead of saving it to end user review, just prints it out.
"""
print(f"Total order value is: {total_order_value:.2f}")
order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])
tutorial_taskflow_api()
步骤 1:定义 DAG¶
和以前一样,你的 DAG 是 Airflow 加载和解析的 Python 脚本。但这次,我们使用 @dag
装饰器来定义它。
src/airflow/example_dags/tutorial_taskflow_api.py
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
)
def tutorial_taskflow_api():
"""
### TaskFlow API Tutorial Documentation
This is a simple data pipeline example which demonstrates the use of
the TaskFlow API using three simple tasks for Extract, Transform, and Load.
Documentation that goes along with the Airflow TaskFlow API tutorial is
located
[here](https://airflow.org.cn/docs/apache-airflow/stable/tutorial_taskflow_api.html)
"""
为了让 Airflow 能够发现这个 DAG,我们可以调用用 @dag
装饰的 Python 函数。
src/airflow/example_dags/tutorial_taskflow_api.py
tutorial_taskflow_api()
版本 2.4 新增功能:如果你使用 @dag
装饰器或在 with
块中定义 DAG,则不再需要将其赋给全局变量。Airflow 将自动找到它。
你可以在 Airflow UI 中可视化你的 DAG!加载 DAG 后,导航到 Graph View 查看任务之间的连接方式。
步骤 2:使用 @task
编写任务¶
使用 TaskFlow,每个任务都只是一个普通的 Python 函数。你可以使用 @task
装饰器将其转换为 Airflow 可以调度和运行的任务。以下是 extract
任务
src/airflow/example_dags/tutorial_taskflow_api.py
@task()
def extract():
"""
#### Extract task
A simple Extract task to get data ready for the rest of the data
pipeline. In this case, getting data is simulated by reading from a
hardcoded JSON string.
"""
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
order_data_dict = json.loads(data_string)
return order_data_dict
函数的返回值会自动传递给下一个任务——无需手动使用 XComs
。在底层,TaskFlow 使用 XComs
自动管理数据传递,抽象了先前方法中手动管理 XCom 的复杂性。你将使用相同的模式定义 transform
和 load
任务。
请注意上面使用了 @task(multiple_outputs=True)
——这告诉 Airflow 该函数返回一个字典,其值应该被拆分成单独的 XCom。返回字典中的每个键都成为一个独立的 XCom 条目,这使得在下游任务中轻松引用特定值成为可能。如果你省略 multiple_outputs=True
,则整个字典将作为一个单独的 XCom 存储,并且必须作为一个整体访问。
步骤 3:构建流程¶
定义任务后,你可以像调用 Python 函数一样调用它们来构建流水线。Airflow 利用这种函数调用来设置任务依赖关系和管理数据传递。
src/airflow/example_dags/tutorial_taskflow_api.py
order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])
就这么简单!Airflow 可以仅凭这段代码就知道如何调度和编排你的流水线。
运行你的 DAG¶
如何启用和触发你的 DAG
导航到 Airflow UI。
在列表中找到你的 DAG 并点击开关按钮启用它。
你可以通过点击“Trigger DAG”按钮手动触发它,或者等待它按计划运行。
幕后发生了什么?¶
如果你使用过 Airflow 1.x,这可能感觉像魔法一样。让我们比较一下底层发生的事情。
“旧方法”:手动连接和 XComs¶
在 TaskFlow API 之前,你必须使用 PythonOperator
等 Operator,并使用 XComs
在任务之间手动传递数据。
以下是使用传统方法时同一个 DAG 的样子
import json
import pendulum
from airflow.sdk import DAG, PythonOperator
def extract():
# Old way: simulate extracting data from a JSON string
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
return json.loads(data_string)
def transform(ti):
# Old way: manually pull from XCom
order_data_dict = ti.xcom_pull(task_ids="extract")
total_order_value = sum(order_data_dict.values())
return {"total_order_value": total_order_value}
def load(ti):
# Old way: manually pull from XCom
total = ti.xcom_pull(task_ids="transform")["total_order_value"]
print(f"Total order value is: {total:.2f}")
with DAG(
dag_id="legacy_etl_pipeline",
schedule_interval=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
) as dag:
extract_task = PythonOperator(task_id="extract", python_callable=extract)
transform_task = PythonOperator(task_id="transform", python_callable=transform)
load_task = PythonOperator(task_id="load", python_callable=load)
extract_task >> transform_task >> load_task
注意
此版本产生与 TaskFlow API 示例相同的结果,但需要显式管理 XComs
和任务依赖项。
TaskFlow 方法¶
使用 TaskFlow,所有这些都自动处理。
src/airflow/example_dags/tutorial_taskflow_api.py
import json
import pendulum
from airflow.sdk import dag, task
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
)
def tutorial_taskflow_api():
"""
### TaskFlow API Tutorial Documentation
This is a simple data pipeline example which demonstrates the use of
the TaskFlow API using three simple tasks for Extract, Transform, and Load.
Documentation that goes along with the Airflow TaskFlow API tutorial is
located
[here](https://airflow.org.cn/docs/apache-airflow/stable/tutorial_taskflow_api.html)
"""
@task()
def extract():
"""
#### Extract task
A simple Extract task to get data ready for the rest of the data
pipeline. In this case, getting data is simulated by reading from a
hardcoded JSON string.
"""
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
order_data_dict = json.loads(data_string)
return order_data_dict
@task(multiple_outputs=True)
def transform(order_data_dict: dict):
"""
#### Transform task
A simple Transform task which takes in the collection of order data and
computes the total order value.
"""
total_order_value = 0
for value in order_data_dict.values():
total_order_value += value
return {"total_order_value": total_order_value}
@task()
def load(total_order_value: float):
"""
#### Load task
A simple Load task which takes in the result of the Transform task and
instead of saving it to end user review, just prints it out.
"""
print(f"Total order value is: {total_order_value:.2f}")
order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])
tutorial_taskflow_api()
Airflow 仍然使用 XComs
并构建依赖关系图——它只是被抽象出来,以便你可以专注于业务逻辑。
XComs 工作原理¶
TaskFlow 返回值自动存储为 XComs
。这些值可以在 UI 的“XCom”选项卡下查看。对于传统 Operator,手动使用 xcom_pull()
仍然是可能的。
错误处理和重试¶
你可以使用装饰器轻松配置任务的重试次数。例如,你可以在任务装饰器中直接设置最大重试次数
@task(retries=3)
def my_task(): ...
这有助于确保瞬时故障不会导致任务失败。
任务参数化¶
你可以在多个 DAG 中复用装饰器任务,并覆盖 task_id
或 retries
等参数。
start = add_task.override(task_id="start")(1, 2)
你甚至可以从共享模块导入装饰器任务。
接下来探索什么¶
干得好!你现在已经使用 TaskFlow API 编写了第一个流水线。想知道接下来可以探索什么吗?
向 DAG 添加一个新任务——也许是过滤或验证步骤
修改返回值并传递多个输出
使用
.override(task_id="...")
探索重试和覆盖功能打开 Airflow UI 并检查数据如何在任务之间流动,包括任务日志和依赖项
另请参阅
继续下一步:构建一个简单的数据流水线
在TaskFlow API 文档中了解更多信息,或继续阅读下方的高级 TaskFlow 模式
在核心概念中阅读有关 Airflow 概念的信息
高级 TaskFlow 模式¶
熟悉基础知识后,可以尝试以下一些强大的技术。
复用装饰器任务¶
你可以在多个 DAG 或 DAG 运行中复用装饰器任务。这对于可复用的工具函数或共享业务规则等常见逻辑尤其有用。使用 .override()
可以自定义任务元数据,例如 task_id
或 retries
。
start = add_task.override(task_id="start")(1, 2)
你甚至可以从共享模块导入装饰器任务。
处理冲突的依赖项¶
有时任务需要与 DAG 其余部分不同的 Python 依赖项——例如,专门的库或系统级软件包。TaskFlow 支持多种执行环境来隔离这些依赖项。
动态创建的 Virtualenv
在任务运行时创建临时的 virtualenv。非常适合实验性或动态任务,但可能会有冷启动开销。
src/airflow/example_dags/example_python_decorator.py
@task.virtualenv(
task_id="virtualenv_python", requirements=["colorama==0.4.0"], system_site_packages=False
)
def callable_virtualenv():
"""
Example function that will be performed in a virtual environment.
Importing at the module level ensures that it will not attempt to import the
library before it is installed.
"""
from time import sleep
from colorama import Back, Fore, Style
print(Fore.RED + "some red text")
print(Back.GREEN + "and with a green background")
print(Style.DIM + "and in dim text")
print(Style.RESET_ALL)
for _ in range(4):
print(Style.DIM + "Please wait...", flush=True)
sleep(1)
print("Finished")
virtualenv_task = callable_virtualenv()
外部 Python 环境
使用预安装的 Python 解释器执行任务——非常适合一致的环境或共享的 virtualenv。
src/airflow/example_dags/example_python_decorator.py
@task.external_python(task_id="external_python", python=PATH_TO_PYTHON_BINARY)
def callable_external_python():
"""
Example function that will be performed in a virtual environment.
Importing at the module level ensures that it will not attempt to import the
library before it is installed.
"""
import sys
from time import sleep
print(f"Running task via {sys.executable}")
print("Sleeping")
for _ in range(4):
print("Please wait...", flush=True)
sleep(1)
print("Finished")
external_python_task = callable_external_python()
Docker 环境
在 Docker 容器中运行你的任务。对于打包任务所需的一切很有用——但需要你的 worker 上有 Docker。
docker/tests/system/docker/example_taskflow_api_docker_virtualenv.py
@task.docker(image="python:3.9-slim-bookworm", multiple_outputs=True)
def transform(order_data_dict: dict):
"""
#### Transform task
A simple Transform task which takes in the collection of order data and
computes the total order value.
"""
total_order_value = 0
for value in order_data_dict.values():
total_order_value += value
return {"total_order_value": total_order_value}
注意
需要 Airflow 2.2 和 Docker provider。
KubernetesPodOperator
在 Kubernetes Pod 中运行你的任务,与主 Airflow 环境完全隔离。非常适合大型任务或需要自定义运行时的任务。
cncf/kubernetes/tests/system/cncf/kubernetes/example_kubernetes_decorator.py
@task.kubernetes(
image="python:3.9-slim-buster",
name="k8s_test",
namespace="default",
in_cluster=False,
config_file="/path/to/.kube/config",
)
def execute_in_k8s_pod():
import time
print("Hello from k8s pod")
time.sleep(2)
@task.kubernetes(image="python:3.9-slim-buster", namespace="default", in_cluster=False)
def print_pattern():
n = 5
for i in range(n):
# inner loop to handle number of columns
# values changing acc. to outer loop
for _ in range(i + 1):
# printing stars
print("* ", end="")
# ending line after each row
print("\r")
execute_in_k8s_pod_instance = execute_in_k8s_pod()
print_pattern_instance = print_pattern()
execute_in_k8s_pod_instance >> print_pattern_instance
注意
需要 Airflow 2.4 和 Kubernetes provider。
使用 Sensor¶
使用 @task.sensor
使用 Python 函数构建轻量级、可复用的 Sensor。这些 Sensor 支持 poke 和 reschedule 两种模式。
src/airflow/example_dags/example_sensor_decorator.py
import pendulum
from airflow.sdk import PokeReturnValue, dag, task
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
)
def example_sensor_decorator():
# Using a sensor operator to wait for the upstream data to be ready.
@task.sensor(poke_interval=60, timeout=3600, mode="reschedule")
def wait_for_upstream() -> PokeReturnValue:
return PokeReturnValue(is_done=True, xcom_value="xcom_value")
@task
def dummy_operator() -> None:
pass
wait_for_upstream() >> dummy_operator()
tutorial_etl_dag = example_sensor_decorator()
与传统任务混用¶
你可以将装饰器任务与经典的 Operator 结合使用。这在使用社区 provider 或逐步迁移到 TaskFlow 时很有帮助。
你可以使用 >>
链式连接 TaskFlow 和传统任务,或使用 .output
属性传递数据。
TaskFlow 中的模板¶
就像传统任务一样,装饰的 TaskFlow 函数支持模板化参数——包括从文件加载内容或使用运行时参数。
运行你的可调用对象时,Airflow 会传递一组关键字参数,这些参数可以在你的函数中使用。这组 kwargs 与你在 Jinja 模板中可以使用的完全对应。为此,你可以将你希望在函数中接收的上下文键作为关键字参数添加。
例如,下面代码块中的可调用对象将获取 ti
和 next_ds
上下文变量的值
@task
def my_python_callable(*, ti, next_ds):
pass
你也可以选择使用 **kwargs
接收整个上下文。请注意,这可能会稍微影响性能,因为 Airflow 需要展开整个上下文,其中可能包含许多你实际上不需要的东西。因此,更建议使用显式参数,如上一段所示。
@task
def my_python_callable(**kwargs):
ti = kwargs["ti"]
next_ds = kwargs["next_ds"]
另外,有时你可能希望在调用栈深处访问上下文,但又不希望从任务可调用对象中传递上下文变量。你仍然可以通过 get_current_context
方法访问执行上下文。
from airflow.sdk import get_current_context
def some_function_in_your_library():
context = get_current_context()
ti = context["ti"]
传递给装饰函数的参数会自动模板化。你也可以使用 templates_exts
对文件进行模板化处理。
@task(templates_exts=[".sql"])
def read_sql(sql): ...
条件执行¶
使用 @task.run_if()
或 @task.skip_if()
根据运行时动态条件控制任务是否运行——而无需改变你的 DAG 结构。
@task.run_if(lambda ctx: ctx["task_instance"].task_id == "run")
@task.bash()
def echo():
return "echo 'run'"
下一步¶
现在你已经了解了如何使用 TaskFlow API 构建简洁、易于维护的 DAG,以下是一些不错的下一步建议。
在资产感知调度中探索资产感知工作流
深入了解调度选项中的调度模式
转到下一个教程:构建一个简单的数据流水线