创建自定义 @task
装饰器¶
从 Airflow 2.2 版本开始,可以从 Provider 包中向 TaskFlow 接口添加自定义装饰器,并使这些装饰器作为 @task.____
设计的一部分原生显示。
例如。假设您正在尝试创建一个更简便的机制来将 Python 函数作为“foo”任务运行。创建和注册 @task.foo
的步骤如下:
创建
FooDecoratedOperator
在这种情况下,我们假设您有一个现有的
FooOperator
,它将 Python 函数作为参数。通过创建一个继承自FooOperator
和airflow.decorators.base.DecoratedOperator
的FooDecoratedOperator
,Airflow 将提供将您的新类视为 TaskFlow 原生类所需的大部分功能。您还应该重写
custom_operator_name
属性,为任务提供一个自定义名称。例如,apache-airflow-providers-docker
Provider 中的_DockerDecoratedOperator
将其设置为@task.docker
,以表明它实现的装饰器名称。创建
foo_task
函数一旦您有了装饰后的类,创建像这样的函数,将新的
FooDecoratedOperator
转换为 TaskFlow 函数装饰器!from typing import TYPE_CHECKING from airflow.sdk.bases.decorator import task_decorator_factory if TYPE_CHECKING: from airflow.sdk.bases.decorator import TaskDecorator def foo_task( python_callable: Callable | None = None, multiple_outputs: bool | None = None, **kwargs, ) -> "TaskDecorator": return task_decorator_factory( python_callable=python_callable, multiple_outputs=multiple_outputs, decorated_operator_class=FooDecoratedOperator, **kwargs, )
在您的 Provider 的 get_provider_info 中注册新的装饰器
最后,在 Provider 入口点返回的字典中添加一个键值对
task-decorators
,如 如何创建自己的 Provider 中所述。这应该是一个列表,其中每个项包含name
和class-name
键。当 Airflow 启动时,ProviderManager
类将自动导入此值,并且task.foo
将作为一个新的装饰器生效!def get_provider_info(): return { "package-name": "foo-provider-airflow", "name": "Foo", "task-decorators": [ { "name": "foo", # "Import path" and function name of the `foo_task` "class-name": "name.of.python.package.foo_task", } ], # ... }
请注意,
name
必须是有效的 Python 标识符。
(可选) 添加 IDE 自动补全支持¶
注意
本节主要适用于 Apache Airflow 托管的 Provider。我们尚未决定是否允许第三方 Provider 以这种方式注册自动补全。
无论好坏,Python IDE 无法自动补全动态生成的方法(参见 JetBrain 关于此主题的说明)。
为了解决这个问题,提供了一个类型 stub 文件 airflow/sdk/definitions/decorators/__init__.pyi
,用于静态声明每个任务装饰器的类型签名。新添加的任务装饰器应像这样声明其签名 stub:
task-sdk/src/airflow/sdk/definitions/decorators/__init__.pyi
def docker(
self,
*,
multiple_outputs: bool | None = None,
python_command: str = "python3",
serializer: Literal["pickle", "cloudpickle", "dill"] | None = None,
use_dill: bool = False, # Added by _DockerDecoratedOperator.
# 'command', 'retrieve_output', and 'retrieve_output_path' are filled by
# _DockerDecoratedOperator.
image: str,
api_version: str | None = None,
container_name: str | None = None,
cpus: float = 1.0,
docker_url: str | None = None,
environment: dict[str, str] | None = None,
private_environment: dict[str, str] | None = None,
env_file: str | None = None,
force_pull: bool = False,
mem_limit: float | str | None = None,
host_tmp_dir: str | None = None,
network_mode: str | None = None,
tls_ca_cert: str | None = None,
tls_client_cert: str | None = None,
tls_client_key: str | None = None,
tls_verify: bool = True,
tls_hostname: str | bool | None = None,
tls_ssl_version: str | None = None,
mount_tmp_dir: bool = True,
tmp_dir: str = "/tmp/airflow",
user: str | int | None = None,
mounts: list[Mount] | None = None,
entrypoint: str | list[str] | None = None,
working_dir: str | None = None,
xcom_all: bool = False,
docker_conn_id: str | None = None,
dns: list[str] | None = None,
dns_search: list[str] | None = None,
auto_remove: Literal["never", "success", "force"] = "never",
shm_size: int | None = None,
tty: bool = False,
hostname: str | None = None,
privileged: bool = False,
cap_add: str | None = None,
extra_hosts: dict[str, str] | None = None,
timeout: int = 60,
device_requests: list[dict] | None = None,
log_opts_max_size: str | None = None,
log_opts_max_file: str | None = None,
ipc_mode: str | None = None,
skip_on_exit_code: int | Container[int] | None = None,
port_bindings: dict | None = None,
ulimits: list[dict] | None = None,
labels: dict[str, str] | list[str] | None = None,
**kwargs,
) -> TaskDecorator:
"""Create a decorator to convert the decorated callable to a Docker task.
:param multiple_outputs: If set, function return value will be unrolled to multiple XCom values.
Dict will unroll to XCom values with keys as XCom keys. Defaults to False.
:param python_command: Python command for executing functions, Default: python3
:param serializer: Which serializer use to serialize the args and result. It can be one of the following:
- ``"pickle"``: (default) Use pickle for serialization. Included in the Python Standard Library.
- ``"cloudpickle"``: Use cloudpickle for serialize more complex types,
this requires to include cloudpickle in your requirements.
- ``"dill"``: Use dill for serialize more complex types,
this requires to include dill in your requirements.
:param use_dill: Deprecated, use ``serializer`` instead. Whether to use dill to serialize
the args and result (pickle is default). This allows more complex types
but requires you to include dill in your requirements.
:param image: Docker image from which to create the container.
If image tag is omitted, "latest" will be used.
:param api_version: Remote API version. Set to ``auto`` to automatically
detect the server's version.
:param container_name: Name of the container. Optional (templated)
:param cpus: Number of CPUs to assign to the container.
This value gets multiplied with 1024. See
https://docs.docker.net.cn/engine/reference/run/#cpu-share-constraint
:param docker_url: URL of the host running the docker daemon.
Default is the value of the ``DOCKER_HOST`` environment variable or unix://var/run/docker.sock
if it is unset.
:param environment: Environment variables to set in the container. (templated)
:param private_environment: Private environment variables to set in the container.
These are not templated, and hidden from the website.
:param env_file: Relative path to the ``.env`` file with environment variables to set in the container.
Overridden by variables in the environment parameter.
:param force_pull: Pull the docker image on every run. Default is False.
:param mem_limit: Maximum amount of memory the container can use.
Either a float value, which represents the limit in bytes,
or a string like ``128m`` or ``1g``.
:param host_tmp_dir: Specify the location of the temporary directory on the host which will
be mapped to tmp_dir. If not provided defaults to using the standard system temp directory.
:param network_mode: Network mode for the container. It can be one of the following:
- ``"bridge"``: Create new network stack for the container with default docker bridge network
- ``"none"``: No networking for this container
- ``"container:<name|id>"``: Use the network stack of another container specified via <name|id>
- ``"host"``: Use the host network stack. Incompatible with `port_bindings`
- ``"<network-name>|<network-id>"``: Connects the container to user created network
(using ``docker network create`` command)
:param tls_ca_cert: Path to a PEM-encoded certificate authority
to secure the docker connection.
:param tls_client_cert: Path to the PEM-encoded certificate
used to authenticate docker client.
:param tls_client_key: Path to the PEM-encoded key used to authenticate docker client.
:param tls_verify: Set ``True`` to verify the validity of the provided certificate.
:param tls_hostname: Hostname to match against
the docker server certificate or False to disable the check.
:param tls_ssl_version: Version of SSL to use when communicating with docker daemon.
:param mount_tmp_dir: Specify whether the temporary directory should be bind-mounted
from the host to the container. Defaults to True
:param tmp_dir: Mount point inside the container to
a temporary directory created on the host by the operator.
The path is also made available via the environment variable
``AIRFLOW_TMP_DIR`` inside the container.
:param user: Default user inside the docker container.
:param mounts: List of mounts to mount into the container, e.g.
``['/host/path:/container/path', '/host/path2:/container/path2:ro']``.
:param entrypoint: Overwrite the default ENTRYPOINT of the image
:param working_dir: Working directory to
set on the container (equivalent to the -w switch the docker client)
:param xcom_all: Push all the stdout or just the last line.
The default is False (last line).
:param docker_conn_id: The :ref:`Docker connection id <howto/connection:docker>`
:param dns: Docker custom DNS servers
:param dns_search: Docker custom DNS search domain
:param auto_remove: Enable removal of the container when the container's process exits. Possible values:
- ``never``: (default) do not remove container
- ``success``: remove on success
- ``force``: always remove container
:param shm_size: Size of ``/dev/shm`` in bytes. The size must be
greater than 0. If omitted uses system default.
:param tty: Allocate pseudo-TTY to the container
This needs to be set see logs of the Docker container.
:param hostname: Optional hostname for the container.
:param privileged: Give extended privileges to this container.
:param cap_add: Include container capabilities
:param extra_hosts: Additional hostnames to resolve inside the container,
as a mapping of hostname to IP address.
:param device_requests: Expose host resources such as GPUs to the container.
:param log_opts_max_size: The maximum size of the log before it is rolled.
A positive integer plus a modifier representing the unit of measure (k, m, or g).
Eg: 10m or 1g Defaults to -1 (unlimited).
:param log_opts_max_file: The maximum number of log files that can be present.
If rolling the logs creates excess files, the oldest file is removed.
Only effective when max-size is also set. A positive integer. Defaults to 1.
:param ipc_mode: Set the IPC mode for the container.
:param skip_on_exit_code: If task exits with this exit code, leave the task
in ``skipped`` state (default: None). If set to ``None``, any non-zero
exit code will be treated as a failure.
:param port_bindings: Publish a container's port(s) to the host. It is a
dictionary of value where the key indicates the port to open inside the container
and value indicates the host port that binds to the container port.
Incompatible with ``"host"`` in ``network_mode``.
:param ulimits: List of ulimit options to set for the container. Each item should
be a :py:class:`docker.types.Ulimit` instance.
:param labels: A dictionary of name-value labels (e.g. ``{"label1": "value1", "label2": "value2"}``)
or a list of names of labels to set with empty values (e.g. ``["label1", "label2"]``)
"""
签名应只允许关键字参数,包括一个名为 multiple_outputs
的参数,该参数默认自动提供。所有其他参数应直接从真实的 FooOperator 中复制,我们建议添加注释以说明哪些参数由 FooDecoratedOperator 自动填充,因此不包含在内。
如果新装饰器可以在没有参数的情况下使用(例如 @task.python
而不是 @task.python()
),您还应该在“真实”定义之后立即添加一个接受单个可调用对象 (callable) 的重载 (overload),以便 mypy 可以将该函数识别为“裸装饰器”(bare decorator)。
task-sdk/src/airflow/sdk/definitions/decorators/__init__.pyi
@overload
def python(self, python_callable: Callable[FParams, FReturn]) -> Task[FParams, FReturn]: ...
一旦更改合并并且下一个 Airflow 版本(次要版本或补丁版本)发布,用户将能够在 IDE 自动补全中看到您的装饰器。此自动补全将根据用户安装的 Provider 版本而变化。
请注意,此步骤不是创建可工作的装饰器所必需的,但可以为 Provider 的用户提供更好的体验。