XComs¶
XComs(“跨通信”的缩写)是一种允许任务之间相互通信的机制,因为默认情况下,任务是完全隔离的,并且可能在完全不同的机器上运行。
一个 XCom 由一个 key
(本质上是它的名称) 以及它来自的 task_id
和 dag_id
标识。它们可以具有任何可序列化的值(包括使用 @dataclass
或 @attr.define
修饰的对象,请参见TaskFlow 参数:),但它们仅设计用于少量数据;不要使用它们传递大数据值,例如数据帧。
使用任务实例上的 xcom_push
和 xcom_pull
方法,可以显式地将 XCom “推送” 和 “拉取” 到/从它们的存储中。
要在名为 “task-1” 的任务中推送一个将由另一个任务使用的值
# pushes data in any_serializable_value into xcom with key "identifier as string"
task_instance.xcom_push(key="identifier as a string", value=any_serializable_value)
要在另一个任务中拉取在上面的代码中推送的值
# pulls the xcom variable with key "identifier as string" that was pushed from within task-1
task_instance.xcom_pull(key="identifier as string", task_ids="task-1")
许多操作符会自动将其结果推送到一个名为 return_value
的 XCom 键中,如果 do_xcom_push
参数设置为 True
(默认情况下),并且 @task
函数也会这样做。xcom_pull
如果没有传递 key,则默认使用 return_value
作为键,这意味着可以编写如下代码
# Pulls the return_value XCOM from "pushing_task"
value = task_instance.xcom_pull(task_ids='pushing_task')
你也可以在模板中使用 XComs
SELECT * FROM {{ task_instance.xcom_pull(task_ids='foo', key='table_name') }}
XComs 是变量的亲戚,主要区别在于 XComs 是按任务实例的,并且设计用于在 DAG 运行中进行通信,而变量是全局的,并且设计用于整体配置和值共享。
如果要一次推送多个 XCom 或重命名推送的 XCom 键,可以将 do_xcom_push
和 multiple_outputs
参数设置为 True
,然后返回一个值字典。
注意
如果第一次任务运行失败,那么每次重试任务时,XComs 将被清除,以使任务运行是幂等的。
对象存储 XCom 后端¶
默认的 XCom 后端是 BaseXCom
类,它将 XComs 存储在 Airflow 数据库中。这对于小值来说很好,但对于大值或大量的 XComs 来说可能会有问题。
要启用将 XComs 存储在对象存储中,可以将 xcom_backend
配置选项设置为 airflow.providers.common.io.xcom.backend.XComObjectStorageBackend
。还需要将 xcom_objectstorage_path
设置为所需的位置。连接 ID 从您提供的 URL 的用户部分获取,例如 xcom_objectstorage_path = s3://conn_id@mybucket/key
。此外,xcom_objectstorage_threshold
必须大于 -1。任何小于阈值(以字节为单位)的对象都将存储在数据库中,任何大于阈值的对象都将放入对象存储中。这将允许混合设置。如果 XCom 存储在对象存储中,则会在数据库中保存一个引用。最后,您可以将 xcom_objectstorage_compression
设置为 fsspec 支持的压缩方法,例如 zip
或 snappy
,以便在将其存储在对象存储中之前压缩数据。
因此,例如,以下配置会将任何大于 1MB 的内容存储在 S3 中,并使用 gzip 进行压缩
[core]
xcom_backend = airflow.providers.common.io.xcom.backend.XComObjectStorageBackend
[common.io]
xcom_objectstorage_path = s3://conn_id@mybucket/key
xcom_objectstorage_threshold = 1048576
xcom_objectstorage_compression = gzip
注意
压缩需要你的 Python 环境中安装了对它的支持。例如,要使用 snappy
压缩,你需要安装 python-snappy
。zip、gzip 和 bz2 可以开箱即用。
自定义 XCom 后端¶
XCom 系统具有可互换的后端,你可以通过 xcom_backend
配置选项设置正在使用的后端。
如果要实现自己的后端,则应子类化 BaseXCom
,并覆盖 serialize_value
和 deserialize_value
方法。
还有一个 orm_deserialize_value
方法,每当为 UI 或报告目的呈现 XCom 对象时都会调用该方法;如果你的 XCom 中有较大或检索成本较高的值,则应覆盖此方法以避免调用该代码(而是返回较轻、不完整的表示形式),以便 UI 保持响应速度。
你还可以覆盖 clear
方法,并在清除给定 DAG 和任务的结果时使用它。这使得自定义 XCom 后端更容易处理数据生命周期。
在容器中验证自定义 XCom 后端的使用¶
根据 Airflow 的部署位置,例如,本地、Docker、K8s 等,可以确保自定义 XCom 后端实际正在初始化,这可能很有用。例如,容器环境的复杂性可能会使确定你的后端在容器部署期间是否正确加载变得更加困难。幸运的是,以下指南可以帮助你建立对自定义 XCom 实现的信心。
如果你可以在 Airflow 容器中执行到终端,则可以打印出正在使用的实际 XCom 类
from airflow.models.xcom import XCom
print(XCom.__name__)