序列化

为了支持任务之间的数据交换(例如参数),Airflow 需要将要交换的数据序列化,并在下游任务中需要时再次反序列化。序列化还会发生,以便 Web 服务器和调度程序(而不是 DAG 处理器)无需读取 DAG 文件。这样做是为了安全和效率。

序列化是一项令人惊讶的艰巨工作。开箱即用的 Python 仅支持对基本类型(如 strint)进行序列化,并且它循环遍历可迭代对象。当事情变得更复杂时,需要自定义序列化。

Airflow 开箱即用支持三种自定义序列化方式。基本类型按原样返回,无需额外编码,例如 str 保持为 str。当它不是基本类型(或其可迭代对象)时,Airflow 会在 airflow.serialization.serializers 命名空间中查找已注册的序列化器和反序列化器。如果未找到,它将在类中查找 serialize() 方法,或者在反序列化时查找 deserialize(data, version: int) 方法。最后,如果类使用 @dataclass@attr.define 修饰,它将使用这些装饰器的公共方法。

如果您希望使用新的序列化器扩展 Airflow,了解何时选择哪种序列化方式非常重要。受 Airflow 控制的对象,即位于 airflow.* 命名空间下,例如 airflow.model.dag.DAG 或受开发者控制的对象,例如 my.company.Foo,应首先检查它们是否可以用 @attr.define@dataclass 修饰。如果无法实现,则应实现 serializedeserialize 方法。 serialize 方法应返回基本类型或字典。它不需要序列化字典中的值,系统会负责处理,但键应为基本形式。

不受 Airflow 控制的对象,例如 numpy.int16 将需要一个已注册的序列化器和反序列化器。需要版本控制。可以返回基本类型,也可以返回字典。同样,dict 值不需要序列化,但其键需要为基本形式。如果您正在实现已注册的序列化器,请特别注意不要出现循环导入。通常,可以通过使用 str 为序列化器列表填充内容来避免这种情况。如下所示:serializers = ["my.company.Foo"] 而不是 serializers = [Foo]

注意

序列化和反序列化依赖于速度。尽可能多地使用 dict 等内置函数,避免使用类和其他复杂结构。

Airflow 对象

from typing import Any, ClassVar


class Foo:
    __version__: ClassVar[int] = 1

    def __init__(self, a, v) -> None:
        self.a = a
        self.b = {"x": v}

    def serialize(self) -> dict[str, Any]:
        return {
            "a": self.a,
            "b": self.b,
        }

    @staticmethod
    def deserialize(data: dict[str, Any], version: int):
        f = Foo(a=data["a"])
        f.b = data["b"]
        return f

已注册

from __future__ import annotations

from decimal import Decimal
from typing import TYPE_CHECKING

from airflow.utils.module_loading import qualname

if TYPE_CHECKING:
    from airflow.serialization.serde import U


serializers = [
    Decimal
]  # this can be a type or a fully qualified str. Str can be used to prevent circular imports
deserializers = serializers  # in some cases you might not have a deserializer (e.g. k8s pod)

__version__ = 1  # required


# the serializer expects output, classname, version, is_serialized?
def serialize(o: object) -> tuple[U, str, int, bool]:
    if isinstance(o, Decimal):
        name = qualname(o)
        _, _, exponent = o.as_tuple()
        if exponent >= 0:  # No digits after the decimal point.
            return int(o), name, __version__, True
            # Technically lossy due to floating point errors, but the best we
            # can do without implementing a custom encode function.
        return float(o), name, __version__, True

    return "", "", 0, False


# the deserializer sanitizes the data for you, so you do not need to deserialize values yourself
def deserialize(classname: str, version: int, data: object) -> Decimal:
    # always check version compatibility
    if version > __version__:
        raise TypeError(f"serialized {version} of {classname} > {__version__}")

    if classname != qualname(Decimal):
        raise TypeError(f"{classname} != {qualname(Decimal)}")

    return Decimal(str(data))

此条目是否有用?