XComs¶
XComs(“跨通信”的简称)是一种机制,它允许 任务 相互通信,因为默认情况下,任务完全独立,并且可能在完全不同的计算机上运行。
XCom 由 key
(本质上是其名称)以及 task_id
和 dag_id
(其来源)标识。它们可以具有任何(可序列化的)值,但它们只设计用于少量数据;不要使用它们传递大值,如数据框。
XCom 使用任务实例上的 xcom_push
和 xcom_pull
方法明确地“推送”和“拉取”至/从其存储。
要在名为 “task-1” 的任务中推送一个值,该值将被另一个任务使用
# pushes data in any_serializable_value into xcom with key "identifier as string"
task_instance.xcom_push(key="identifier as a string", value=any_serializable_value)
要在不同的任务中拉取在上述代码中推送的值
# pulls the xcom variable with key "identifier as string" that was pushed from within task-1
task_instance.xcom_pull(key="identifier as string", task_ids="task-1")
许多操作员会自动将其结果推送到名为 return_value
的 XCom 键中,如果 do_xcom_push
参数设置为 True
(默认情况下为 True),@task
函数也会这样做。如果未向 xcom_pull
传递键,则它默认使用 return_value
作为键,这意味着可以编写如下代码
# Pulls the return_value XCOM from "pushing_task"
value = task_instance.xcom_pull(task_ids='pushing_task')
您还可以在 模板 中使用 XCom
SELECT * FROM {{ task_instance.xcom_pull(task_ids='foo', key='table_name') }}
XCom 是 变量 的一个亲戚,主要区别在于 XCom 是针对每个任务实例的,并且设计用于 DAG 运行中的通信,而变量是全局的,并且设计用于整体配置和值共享。
如果您想一次推送多个 XCom 或重命名推送的 XCom 键,可以使用将 do_xcom_push
和 multiple_outputs
参数设置为 True
,然后返回一个值字典。
注意
如果第一个任务运行未成功,则在每次重试任务时,XCom 都会被清除以使任务运行幂等。
对象存储 XCom 后端¶
默认的 XCom 后端是 BaseXCom
类,它将 XCom 存储在 Airflow 数据库中。对于小值来说这很好,但对于大值或大量 XCom 来说可能会有问题。
要启用将 XCom 存储在对象存储中,你可以将 xcom_backend
配置选项设置为 airflow.providers.common.io.xcom.backend.XComObjectStorageBackend
。你还需要将 xcom_objectstorage_path
设置为所需的位置。连接 ID 从你将提供的 URL 的用户部分获取,例如 xcom_objectstorage_path = s3://conn_id@mybucket/key
。此外,xcom_objectstorage_threshold
必须大于 -1。任何小于阈值的字节对象都将存储在数据库中,而任何大于阈值的对象都将放入对象存储中。这将允许混合设置。如果 xcom 存储在对象存储中,则引用将保存在数据库中。最后,你可以将 xcom_objectstorage_compression
设置为 fsspec 支持的压缩方法,如 zip
或 snappy
,以在将数据存储在对象存储中之前对其进行压缩。
因此,例如,以下配置将把所有大于 1MB 的内容存储在 S3 中,并使用 gzip 对其进行压缩
[core]
xcom_backend = airflow.providers.common.io.xcom.backend.XComObjectStorageBackend
[common.io]
xcom_objectstorage_path = s3://conn_id@mybucket/key
xcom_objectstorage_threshold = 1048576
xcom_objectstoragee_compression = gzip
注意
压缩需要在你的 Python 环境中安装对其的支持。例如,要使用 snappy
压缩,你需要安装 python-snappy
。Zip、gzip 和 bz2 开箱即用。
自定义 XCom 后端¶
XCom 系统具有可互换的后端,你可以通过 xcom_backend
配置选项设置正在使用的后端。
如果您想实现自己的后端,您应该子类化 BaseXCom
,并覆盖 serialize_value
和 deserialize_value
方法。
还有一个 orm_deserialize_value
方法,它会在每次为 UI 或报告目的呈现 XCom 对象时调用;如果您在 XCom 中有大型或检索成本高的值,您应该覆盖此方法以避免调用该代码(而是返回一个更轻量、不完整的表示),以便 UI 保持响应。
您还可以覆盖 clear
方法,并在清除给定 DAG 和任务的结果时使用它。这允许自定义 XCom 后端更轻松地处理数据生命周期。
在容器中使用自定义 XCom 后端¶
根据 Airflow 的部署位置,即本地、Docker、K8s 等,确保实际初始化了一个自定义 XCom 后端会很有用。例如,容器环境的复杂性可能会让在容器部署期间更难以确定您的后端是否正确加载。幸运的是,可以使用以下指南来帮助您对自定义 XCom 实现建立信心。
首先,如果您可以在容器中执行到终端,那么您应该能够执行
from airflow.models.xcom import XCom
print(XCom.__name__)
它将打印正在使用的实际类。
您还可以检查 Airflow 的配置
from airflow.settings import conf
conf.get("core", "xcom_backend")
通过 Helm 在 K8s 中使用自定义后端¶
在 K8s 中运行自定义 XCom 后端会给您的 Airflow 部署带来更多复杂性。简单来说,有时会出现难以调试的问题。
例如,如果您在图表 values.yaml
(通过 xcom_backend
配置)中定义了一个自定义 XCom 后端,并且 Airflow 无法加载该类,那么整个图表部署将失败,每个 pod 容器都会尝试一次又一次地重新启动。
在 K8s 中部署时,您的自定义 XCom 后端需要驻留在 config
目录中,否则在图表部署期间无法找到它。
一个已观察到的问题是,从容器中获取日志非常困难,因为获取跟踪的可用窗口非常小。只有在您足够幸运地在正确的时间查询并获取容器日志时,才能确定根本原因。这反过来又会阻止整个 Helm 图表成功部署。