任务 Docker 装饰器

@task.docker 装饰器封装的 Python 可调用对象及其参数将在 docker 容器内执行。

参数

Docker 任务装饰器支持以下参数。

multiple_outputs

如果设置,函数返回值将被展开为多个 XCom 值。字典将以键作为 XCom 键展开为 XCom 值。默认为 False。

use_dill

是否使用 dill 或 pickle 进行序列化

python_command

执行函数的 Python 命令,默认为 python3

image

用于创建容器的 Docker 镜像。如果省略镜像标签,将使用“latest”。

api_version

远程 API 版本。设置为 auto 可自动检测服务器版本。

container_name

容器名称。可选(模板化)

cpus

分配给容器的 CPU 数量。此值将乘以 1024。

docker_url

运行 docker 守护进程的主机 URL。默认为 DOCKER_HOST 环境变量的值,如果未设置,则为 unix://var/run/docker.sock。

environment

在容器中设置的环境变量。(模板化)

private_environment

在容器中设置的私有环境变量。这些变量不是模板化的,并且在网站上隐藏。

env_file

.env 文件的相对路径,其中包含要在容器中设置的环境变量。会被 environment 参数中的变量覆盖。

force_pull

每次运行时拉取 docker 镜像。默认为 False。

mem_limit

容器可使用的最大内存量。可以是表示字节限制的浮点值,或形如 128m1g 的字符串。

host_tmp_dir

指定主机上临时目录的位置,该目录将映射到 tmp_dir。如果未提供,默认为使用标准系统临时目录。

network_mode

容器的网络模式。可以是以下之一

  • "bridge": 使用默认 docker bridge 网络为容器创建新的网络堆栈

  • "none": 此容器没有网络

  • "container:<name>""container:<id>": 使用通过 指定的另一个容器的网络堆栈

  • "host": 使用主机网络堆栈。与 port_bindings 不兼容

  • "<network-name>""<network-id>": 将容器连接到用户创建的网络(使用 docker network create 命令)

tls_ca_cert

PEM 编码的证书颁发机构文件的路径,用于保护 docker 连接。

tls_client_cert

PEM 编码的客户端证书文件的路径,用于验证 docker 客户端。

tls_client_key

PEM 编码的客户端密钥文件的路径,用于验证 docker 客户端。

tls_verify

设置为 True 以验证所提供证书的有效性。

tls_hostname

要与 docker 服务器证书匹配的主机名,或设置为 False 以禁用检查。

tls_ssl_version

与 docker 守护进程通信时使用的 SSL 版本。

mount_tmp_dir

指定临时目录是否应从主机绑定挂载到容器。

tmp_dir

容器内部的挂载点,对应于操作员在主机上创建的临时目录。该路径也可通过容器内的环境变量 AIRFLOW_TMP_DIR 获取。

user

docker 容器内的默认用户。

mounts

要挂载到容器内的挂载点列表,例如 ['/host/path:/container/path', '/host/path2:/container/path2:ro']

working_dir

在容器上设置的工作目录(相当于 docker 客户端的 -w 开关)

entrypoint

覆盖镜像的默认 ENTRYPOINT

xcom_all

推送所有标准输出还是仅推送最后一行。默认为 False(最后一行)。

docker_conn_id

要使用的 Airflow 连接 ID

dns

Docker 自定义 DNS 服务器

dns_search

Docker 自定义 DNS 搜索域

auto_remove

当容器进程退出时启用容器的移除。可能的值

  • never: (默认)不移除容器

  • success: 成功时移除

  • force: 总是移除容器

shm_size

/dev/shm 的大小(字节)。大小必须大于 0。如果省略,使用系统默认值。

tty

为容器分配伪 TTY。需要设置此项才能查看 Docker 容器的日志。

hostname

容器的可选主机名。

privileged

授予此容器扩展权限。

cap_add

包含容器能力

extra_hosts

在容器内部解析的额外主机名,以主机名到 IP 地址的映射表示。

timeout

API 调用的默认超时时间,单位为秒。

device_requests

将主机资源(如 GPU)暴露给容器。

log_opts_max_size

日志滚动前的最大大小。一个正整数加上表示度量单位(k、m 或 g)的修饰符。例如:10m 或 1g。默认为 -1(无限制)。

log_opts_max_file

最多可以存在的日志文件数量。如果滚动日志创建了多余的文件,将删除最旧的文件。仅在设置了 max-size 时有效。一个正整数。默认为 1。

ipc_mode

设置容器的 IPC 模式。

skip_on_exit_code

如果任务以此退出码退出,将任务保持在 skipped 状态(默认:None)。如果设置为 None,任何非零退出码都将被视为失败。

port_bindings

将容器的端口发布到主机。这是一个字典值,其中键表示要在容器内打开的端口,值表示绑定到容器端口的主机端口。与 network_mode 中的 "host" 不兼容。

ulimits

要为容器设置的 ulimit 选项列表。每个项应为一个 docker.types.Ulimit 实例。

使用示例

tests/system/docker/example_taskflow_api_docker_virtualenv.py

    @task.docker(image="python:3.9-slim-bookworm", multiple_outputs=True)
    def transform(order_data_dict: dict):
        """
        #### Transform task
        A simple Transform task which takes in the collection of order data and
        computes the total order value.
        """
        total_order_value = 0

        for value in order_data_dict.values():
            total_order_value += value

        return {"total_order_value": total_order_value}

此条目有帮助吗?