airflow.providers.docker.operators.docker

实现 Docker 操作符。

DockerOperator

在 Docker 容器内部执行命令。

函数

stringify(line)

确保即使传入的是 bytes 也返回字符串。Docker 流可能返回 bytes。

fetch_logs(log_stream, log)

模块内容

airflow.providers.docker.operators.docker.stringify(line)[source]

确保即使传入的是 bytes 也返回字符串。Docker 流可能返回 bytes。

airflow.providers.docker.operators.docker.fetch_logs(log_stream, log)[source]
class airflow.providers.docker.operators.docker.DockerOperator(*, image, api_version=None, command=None, container_name=None, cpus=1.0, docker_url=None, environment=None, private_environment=None, env_file=None, force_pull=False, mem_limit=None, host_tmp_dir=None, network_mode=None, tls_ca_cert=None, tls_client_cert=None, tls_client_key=None, tls_verify=True, tls_hostname=None, tls_ssl_version=None, mount_tmp_dir=True, tmp_dir='/tmp/airflow', user=None, mounts=None, entrypoint=None, working_dir=None, xcom_all=False, docker_conn_id=None, dns=None, dns_search=None, auto_remove='never', shm_size=None, tty=False, hostname=None, privileged=False, cap_add=None, extra_hosts=None, retrieve_output=False, retrieve_output_path=None, timeout=DEFAULT_TIMEOUT_SECONDS, device_requests=None, log_opts_max_size=None, log_opts_max_file=None, ipc_mode=None, skip_on_exit_code=None, port_bindings=None, ulimits=None, labels=None, **kwargs)[source]

Bases: airflow.providers.docker.version_compat.BaseOperator

在 Docker 容器内部执行命令。

默认情况下,Airflow 会在宿主机上创建一个临时目录并将其挂载到容器中,以便在容器内部存放总大小超过默认 10 GB 磁盘配额的文件。挂载目录的路径可通过容器内的环境变量 AIRFLOW_TMP_DIR 访问。

如果挂载卷失败,日志会打印警告并尝试在不挂载临时文件夹的情况下执行 Docker 命令。此行为可在使用远程 Docker 引擎或 Docker‑in‑Docker(且宿主机目录未与 Docker 引擎共享)的场景下保持默认可用。

如果明确知道 DockerOperator 被用于远程引擎或 Docker‑in‑Docker,建议将 mount_tmp_dir 参数设为 False。此时仍可通过 mounts 参数挂载已经在 Docker 引擎中存在的命名卷,以实现类似的“大文件存储”能力。

如果在拉取镜像前需要登录私有仓库,需要在 Airflow 中配置 Docker 连接,并在参数 docker_conn_id 中提供其 connection ID。

参数:
  • image (str) – 用于创建容器的 Docker 镜像。若未指定镜像标签,默认使用 “latest”。(支持模板化)

  • api_version (str | None) – 远程 API 版本。设为 auto 时会自动检测服务器的版本。

  • command (str | list[str] | None) – 要在容器内执行的命令。(支持模板化)

  • container_name (str | None) – 容器的名称。可选。(支持模板化)

  • cpus (float) – 为容器分配的 CPU 数量。该值会乘以 1024。参考 Docker CPU 共享约束文档

  • docker_url (str | list[str] | None) – Docker 守护进程所在主机的 URL 或 URL 列表。默认取环境变量 DOCKER_HOST 的值,若未设置则使用 unix://var/run/docker.sock

  • environment (dict | None) – 要在容器内设置的环境变量。(支持模板化)

  • private_environment (dict | None) – 要在容器内设置的私有环境变量。该参数不参与模板渲染,在 UI 中也不可见。

  • env_file (str | None) – 相对路径,指向包含环境变量的 .env 文件。变量会被 environment 参数覆盖。(支持模板化)

  • force_pull (bool) – 每次运行时都拉取 Docker 镜像。默认 False。

  • mem_limit (float | str | None) – 容器可使用的最大内存。可以是表示字节数的浮点数,或形如 128m1g 的字符串。

  • host_tmp_dir (str | None) – 指定宿主机上临时目录的存放位置,该目录会映射到 tmp_dir。若未提供,默认使用系统临时目录。

  • network_mode (str | None) –

    容器的网络模式,可取以下值之一:

    • "bridge":为容器创建基于默认 Docker 桥接网络的全新网络栈。

    • "none":容器不使用网络。

    • "container:<name|id>":共享指定容器(通过名称或 ID)的网络栈。

    • "host":直接使用宿主机的网络栈。此模式与 port_bindings 不兼容。

    • "<network-name>|<network-id>":连接到用户自建的网络(通过 docker network create 命令创建)。

  • tls_ca_cert (str | None) – PEM 编码的 CA 证书路径,用于加密 Docker 连接。

  • tls_client_cert (str | None) – PEM 编码的客户端证书路径,用于 Docker 客户端身份验证。

  • tls_client_key (str | None) – PEM 编码的客户端密钥路径,用于 Docker 客户端身份验证。

  • tls_verify (bool) – 设置为 True 时会校验提供的证书是否合法。

  • tls_hostname (str | bool | None) – 用于匹配 Docker 服务器证书的主机名;设为 False 则禁用此检查。

  • tls_ssl_version (str | None) – 与 Docker 守护进程通信时使用的 SSL 版本。

  • mount_tmp_dir (bool) – 是否将宿主机的临时目录通过 bind‑mount 方式挂载到容器内。默认 True。

  • tmp_dir (str) – 操作符在宿主机创建的临时目录在容器内部的挂载点。该路径同样会通过环境变量 AIRFLOW_TMP_DIR 暴露给容器内部的进程。

  • user (str | int | None) – 在容器内使用的默认用户。

  • mounts (list[docker.types.Mount] | None) – 将卷挂载到容器的列表。每项应为 docker.types.Mount 实例。(支持模板化)

  • entrypoint (str | list[str] | None) – 覆盖镜像默认的 ENTRYPOINT。

  • working_dir (str | None) – 容器内部的工作目录(相当于 Docker 客户端的 -w 参数)。

  • xcom_all (bool) – 将全部 stdout 推送至 XCom,还是仅推送最后一行。默认 False(仅最后一行)。

  • docker_conn_id (str | None) – Docker 连接 ID

  • dns (list[str] | None) – Docker 自定义 DNS 服务器列表。

  • dns_search (list[str] | None) – Docker 自定义 DNS 搜索域。

  • auto_remove (Literal['never', 'success', 'force']) –

    容器进程退出后是否自动删除容器。可选值:

    • never:默认,不删除容器。

    • success:成功时删除。

    • force:始终删除容器。

  • shm_size (int | None) – /dev/shm 的大小(字节),必须大于 0。若未指定则使用系统默认。

  • tty (bool) – 为容器分配伪终端 (pseudo‑TTY)。需开启此选项才能查看容器日志。

  • hostname (str | None) – 为容器指定的可选主机名。

  • privileged (bool) – 为容器授予扩展权限。

  • cap_add (collections.abc.Iterable[str] | None) – 向容器添加的 Linux 能力列表。

  • extra_hosts (dict[str, str] | None) – 在容器内部额外的 hostname↔IP 映射。

  • retrieve_output (bool) – 是否在容器运行期间持续尝试从镜像中拉取输出文件并在手动关闭镜像前写入。常用于希望将 pickle 序列化的结果写入文件而非日志的场景。

  • retrieve_output_path (str | None) – 待检索的输出文件路径,文件内容将通过 XCom 传递。

  • timeout (int) – API 调用的超时时间(秒),默认 60 秒。

  • device_requests (list[docker.types.DeviceRequest] | None) – 将主机资源(如 GPU)暴露给容器。

  • log_opts_max_size (str | None) – 日志文件在滚动前的最大大小。格式为正整数加单位后缀(k、m、g),例如 10m1g;默认 -1 表示无限制。

  • log_opts_max_file (str | None) – 可保留的最大日志文件数量。若滚动产生的文件数超出此值,最旧的文件会被删除。仅在设置 max-size 时生效。正整数,默认 1。

  • ipc_mode (str | None) – 为容器设置的 IPC(进程间通信)模式。

  • skip_on_exit_code (int | collections.abc.Container[int] | None) – 当任务返回该退出码时,将任务状态置为 skipped(默认 None)。若设为 None,任何非零退出码都会被视为失败。

  • port_bindings (dict | None) – 将容器端口映射到宿主机端口的字典。键为容器内部端口,值为宿主机对应端口。与 network_mode"host" 时不兼容。

  • ulimits (list[docker.types.Ulimit] | None) – 为容器设置的 ulimit 选项列表。每项应为 docker.types.Ulimit 实例。

  • labels (dict[str, str] | list[str] | None) – 标签字典(例如 {"label1":"value1","label2":"value2"})或仅包含标签名的列表(例如 ["label1","label2"]),后者的标签值为空。

template_fields: collections.abc.Sequence[str] = ('image', 'command', 'environment', 'env_file', 'container_name', 'mounts')[source]
template_fields_renderers[source]
template_ext: collections.abc.Sequence[str] = ('.sh', '.bash', '.env')[source]
api_version = None[source]
auto_remove = 'never'[source]
command = None[source]
container_name = None[source]
cpus = 1.0[source]
dns = None[source]
docker_url[source]
environment[源码]
env_file = None[源码]
force_pull = False[源码]
image[源码]
mem_limit = None[源码]
host_tmp_dir = None[源码]
network_mode = None[源码]
tls_ca_cert = None[源码]
tls_client_cert = None[源码]
tls_client_key = None[源码]
tls_verify = True[源码]
tls_hostname = None[源码]
tls_ssl_version = None[源码]
mount_tmp_dir = True[源码]
tmp_dir = '/tmp/airflow'[源码]
user = None[源码]
mounts = [][源码]
entrypoint = None[源码]
working_dir = None[源码]
xcom_all = False[源码]
docker_conn_id = None[源码]
shm_size = None[源码]
tty = False[源码]
hostname = None[源码]
privileged = False[源码]
cap_add = None[源码]
extra_hosts = None[源码]
ulimits = [][源码]
labels = None[源码]
container: dict = None[源码]
retrieve_output = False[源码]
retrieve_output_path = None[源码]
timeout = 60[源码]
device_requests = None[源码]
log_opts_max_size = None[源码]
log_opts_max_file = None[源码]
ipc_mode = None[源码]
skip_on_exit_code[源码]
port_bindings[源码]
属性 hook: airflow.providers.docker.hooks.docker.DockerHook[源码]

创建并返回一个 DockerHook(缓存)。

属性 cli: docker.APIClient[源码]
execute(context)[源码]

在创建算子时派生。

执行任务的主要方法。Context 是与渲染 jinja 模板时使用的相同字典。

有关更多上下文,请参考 get_template_context。

静态 format_command(command)[源码]

获取命令。

如果命令字符串以 [ 开头,则视为 Python 字面量并解析为命令列表。

参数:

command (list[str] | str | None) – Docker 命令或入口点

返回:

命令(或命令列表)

返回类型:

list[str] | str | None

on_kill()[源码]

当任务实例被终止时,重写此方法以清理子进程。

在算子内部使用 threading、subprocess 或 multiprocessing 模块时,需要进行清理,否则会留下僵尸进程。

静态 unpack_environment_variables(env_str)[源码]

从字符串解析环境变量。

参数:

env_str (str) – 形如 {key}={value}、以 \n(换行)分隔的环境变量字符串

返回:

包含解析后环境变量的字典

返回类型:

dict

此条目是否有帮助?