airflow.providers.docker.operators.docker

实现 Docker operator。

DockerOperator

在 Docker 容器内执行命令。

函数

stringify(line)

确保即使传递的是字节也能返回字符串。Docker 流可能返回字节。

fetch_logs(log_stream, log)

模块内容

airflow.providers.docker.operators.docker.stringify(line)[source]

确保即使传递的是字节也能返回字符串。Docker 流可能返回字节。

airflow.providers.docker.operators.docker.fetch_logs(log_stream, log)[source]
class airflow.providers.docker.operators.docker.DockerOperator(*, image, api_version=None, command=None, container_name=None, cpus=1.0, docker_url=None, environment=None, private_environment=None, env_file=None, force_pull=False, mem_limit=None, host_tmp_dir=None, network_mode=None, tls_ca_cert=None, tls_client_cert=None, tls_client_key=None, tls_verify=True, tls_hostname=None, tls_ssl_version=None, mount_tmp_dir=True, tmp_dir='/tmp/airflow', user=None, mounts=None, entrypoint=None, working_dir=None, xcom_all=False, docker_conn_id=None, dns=None, dns_search=None, auto_remove='never', shm_size=None, tty=False, hostname=None, privileged=False, cap_add=None, extra_hosts=None, retrieve_output=False, retrieve_output_path=None, timeout=DEFAULT_TIMEOUT_SECONDS, device_requests=None, log_opts_max_size=None, log_opts_max_file=None, ipc_mode=None, skip_on_exit_code=None, port_bindings=None, ulimits=None, labels=None, **kwargs)[source]

Bases: airflow.models.BaseOperator

在 Docker 容器内执行命令。

默认情况下,会在主机上创建一个临时目录并挂载到容器中,以允许存储总大小超过容器默认 10GB 磁盘空间的文件。在这种情况下,可以通过环境变量 AIRFLOW_TMP_DIR 访问挂载目录的路径。

如果无法挂载卷,将打印警告并尝试在不挂载临时目录的情况下执行 docker 命令。这是为了使其在远程 docker 引擎或运行 docker-in-docker 解决方案且临时目录未与 docker 引擎共享时也能正常工作。在这种情况下,日志中会打印警告。

如果你知道你正在使用远程引擎或通过 docker-in-docker 运行 DockerOperator,你应该将 mount_tmp_dir 参数设置为 False。在这种情况下,你仍然可以使用 mounts 参数来挂载 Docker Engine 中已有的命名卷,以实现类似的功能,即可以存储超出容器默认磁盘大小的文件。

如果在拉取镜像之前需要登录私有注册表,需要在 Airflow 中配置 Docker 连接,并通过参数 docker_conn_id 提供连接 ID。

参数:
  • image (str) – 用于创建容器的 Docker 镜像。如果省略镜像标签,将使用“latest”。(可模板化)

  • api_version (str | None) – 远程 API 版本。设置为 auto 可自动检测服务器版本。

  • command (str | list[str] | None) – 在容器中运行的命令。(可模板化)

  • container_name (str | None) – 容器的名称。可选 (可模板化)

  • cpus (float) – 分配给容器的 CPU 数量。此值乘以 1024。参阅 https://docs.docker.net.cn/engine/reference/run/#cpu-share-constraint

  • docker_url (str | list[str] | None) – 运行 docker daemon 的主机 URL 或 URL 列表。默认值为 DOCKER_HOST 环境变量的值,如果未设置则为 unix://var/run/docker.sock。

  • environment (dict | None) – 在容器中设置的环境变量。(可模板化)

  • private_environment (dict | None) – 在容器中设置的私有环境变量。这些变量不可模板化,并且在网站上隐藏。

  • env_file (str | None) – 包含要在容器中设置的环境变量的 .env 文件的相对路径。会被 environment 参数中的变量覆盖。(可模板化)

  • force_pull (bool) – 每次运行时都拉取 docker 镜像。默认值为 False。

  • mem_limit (float | str | None) – 容器可使用的最大内存量。可以是浮点值(表示字节限制),或者像 128m1g 这样的字符串。

  • host_tmp_dir (str | None) – 指定主机上的临时目录位置,该目录将映射到 tmp_dir。如果未提供,默认为使用标准系统临时目录。

  • network_mode (str | None) –

    容器的网络模式。可以是以下之一:

    • "bridge": 为容器创建新的网络栈,使用默认的 docker bridge 网络。

    • "none": 此容器没有网络。

    • "container:<name|id>": 使用通过 <name|id> 指定的另一个容器的网络栈。

    • "host": 使用主机网络栈。与 port_bindings 不兼容。

    • "<network-name>|<network-id>": 将容器连接到用户创建的网络(使用 docker network create 命令)。

  • tls_ca_cert (str | None) – 用于保护 docker 连接的 PEM 编码证书颁发机构的路径。

  • tls_client_cert (str | None) – 用于验证 docker 客户端身份的 PEM 编码证书的路径。

  • tls_client_key (str | None) – 用于验证 docker 客户端身份的 PEM 编码密钥的路径。

  • tls_verify (bool) – 设置为 True 以验证所提供证书的有效性。

  • tls_hostname (str | bool | None) – 用于匹配 docker 服务器证书的主机名,或设置为 False 以禁用检查。

  • tls_ssl_version (str | None) – 与 docker daemon 通信时使用的 SSL 版本。

  • mount_tmp_dir (bool) – 指定是否应将临时目录从主机绑定挂载到容器。默认为 True。

  • tmp_dir (str) – 容器内的挂载点,指向 operator 在主机上创建的临时目录。该路径也通过容器内的环境变量 AIRFLOW_TMP_DIR 提供。

  • user (str | int | None) – docker 容器内的默认用户。

  • mounts (list[docker.types.Mount] | None) – 要挂载到容器中的卷列表。每个项都应是一个 docker.types.Mount 实例。

  • entrypoint (str | list[str] | None) – 覆盖镜像的默认 ENTRYPOINT。

  • working_dir (str | None) – 设置容器的工作目录(等同于 docker client 的 -w 开关)。

  • xcom_all (bool) – 推送所有 stdout 或仅推送最后一行。默认值为 False(最后一行)。

  • docker_conn_id (str | None) – Docker 连接 ID

  • dns (list[str] | None) – Docker 自定义 DNS 服务器。

  • dns_search (list[str] | None) – Docker 自定义 DNS 搜索域。

  • auto_remove (typing_extensions.Literal[never, success, force]) –

    在容器进程退出时启用容器的删除。可能的值:

    • never: (默认) 不删除容器。

    • success: 成功时删除。

    • force: 始终删除容器。

  • shm_size (int | None) – /dev/shm 的大小(以字节为单位)。大小必须大于 0。如果省略,则使用系统默认值。

  • tty (bool) – 为容器分配伪 TTY。需要设置此项以查看 Docker 容器的日志。

  • hostname (str | None) – 容器的可选主机名。

  • privileged (bool) – 赋予此容器扩展权限。

  • cap_add (collections.abc.Iterable[str] | None) – 包含容器能力。

  • extra_hosts (dict[str, str] | None) – 在容器内解析的额外主机名,作为主机名到 IP 地址的映射。

  • retrieve_output (bool) – 此 docker 镜像在手动关闭镜像之前是否应始终尝试从输出文件拉取数据并输出。当用户需要未发布到日志的 pickle 序列化输出时很有用。

  • retrieve_output_path (str | None) – 将检索并传递给 xcom 的输出文件的路径。

  • timeout (int) – API 调用超时时间,单位为秒。默认值为 60 秒。

  • device_requests (list[docker.types.DeviceRequest] | None) – 向容器暴露主机资源,例如 GPU。

  • log_opts_max_size (str | None) – 日志滚动前的最大大小。一个正整数加上表示度量单位的修饰符(k、m 或 g)。例如:10m 或 1g。默认为 -1(无限制)。

  • log_opts_max_file (str | None) – 可存在的最大日志文件数。如果日志滚动创建了多余的文件,则删除最旧的文件。仅在同时设置了 max-size 时有效。一个正整数。默认为 1。

  • ipc_mode (str | None) – 设置容器的 IPC 模式。

  • skip_on_exit_code (int | collections.abc.Container[int] | None) – 如果任务以此退出码退出,则将任务保留在 skipped 状态(默认:None)。如果设置为 None,则任何非零退出码都将被视为失败。

  • port_bindings (dict | None) – 将容器的端口发布到主机。这是一个字典,其中键表示要在容器内打开的端口,值表示绑定到容器端口的主机端口。与 network_mode 中的 "host" 不兼容。

  • ulimits (list[docker.types.Ulimit] | None) – 为容器设置的 ulimit 选项列表。每个项都应是一个 docker.types.Ulimit 实例。

  • labels (dict[str, str] | list[str] | None) – 名称-值标签的字典(例如 {"label1": "value1", "label2": "value2"})或要设置为空值的标签名称列表(例如 ["label1", "label2"])。

template_fields: collections.abc.Sequence[str] = ('image', 'command', 'environment', 'env_file', 'container_name')[source]
template_fields_renderers[source]
template_ext: collections.abc.Sequence[str] = ('.sh', '.bash', '.env')[source]
api_version = None[source]
auto_remove = 'never'[source]
command = None[source]
container_name = None[source]
cpus = 1.0[source]
dns =None[source]
docker_url[source]
environment[source]
env_file = None[source]
force_pull = False[source]
image[source]
mem_limit = None[source]
host_tmp_dir = None[source]
network_mode = None[source]
tls_ca_cert = None[source]
tls_client_cert = None[source]
tls_client_key = None[source]
tls_verify = True[source]
tls_hostname = None[source]
tls_ssl_version = None[source]
mount_tmp_dir = True[source]
tmp_dir = '/tmp/airflow'[source]
user = None[source]
mounts = [][source]
entrypoint = None[source]
working_dir = None[source]
xcom_all = False[source]
docker_conn_id = None[source]
shm_size = None[source]
tty = False[source]
hostname = None[source]
privileged = False[source]
cap_add = None[source]
extra_hosts = None[source]
ulimits = [][source]
labels = None[source]
container: dict = None[source]
retrieve_output = False[source]
retrieve_output_path = None[source]
timeout = 60[source]
device_requests = None[source]
log_opts_max_size = None[source]
log_opts_max_file = None[source]
ipc_mode = None[source]
skip_on_exit_code = None[source]
port_bindings[source]
property hook: airflow.providers.docker.hooks.docker.DockerHook[source]

创建并返回一个 DockerHook(已缓存)。

property cli: docker.APIClient[source]
execute(context)[source]

创建操作符时派生。

上下文与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

static format_command(command)[source]

检索命令。

如果命令字符串以 [ 开头,则该字符串将被视为 Python 字面量并解析为命令列表。

参数:

command (list[str] | str | None) – Docker 命令或入口点

返回:

命令(或命令列表)

返回类型:

list[str] | str | None

on_kill()[source]

覆盖此方法可在任务实例被终止时清理子进程。

在操作符中使用 threading、subprocess 或 multiprocessing 模块都需要清理,否则会留下僵尸进程。

static unpack_environment_variables(env_str)[source]

从字符串解析环境变量。

参数:

env_str (str) – 环境变量格式为 {key}={value},由 \n(换行符)分隔

返回:

包含解析后的环境变量的字典

返回类型:

dict

此条目是否有帮助?