airflow.providers.amazon.aws.operators.ecs

EcsBaseOperator

这是所有弹性容器服务 (Elastic Container Service) 算子的基类。

EcsCreateClusterOperator

创建一个 AWS ECS 集群。

EcsDeleteClusterOperator

删除一个 AWS ECS 集群。

EcsDeregisterTaskDefinitionOperator

在 AWS ECS 上注销一个任务定义。

EcsRegisterTaskDefinitionOperator

在 AWS ECS 上注册一个任务定义。

EcsRunTaskOperator

在 AWS ECS (弹性容器服务) 上执行一个任务。

模块内容

class airflow.providers.amazon.aws.operators.ecs.EcsBaseOperator(*, aws_conn_id='aws_default', region_name=None, verify=None, botocore_config=None, region=NOTSET, **kwargs)[source]

基类: airflow.providers.amazon.aws.operators.base_aws.AwsBaseOperator[airflow.providers.amazon.aws.hooks.ecs.EcsHook]

这是所有弹性容器服务 (Elastic Container Service) 算子的基类。

aws_hook_class[source]
property client: boto3.client[source]

创建并返回 EcsHook 的客户端。

abstract execute(context)[source]

必须在子类中重写。

class airflow.providers.amazon.aws.operators.ecs.EcsCreateClusterOperator(*, cluster_name, create_cluster_kwargs=None, wait_for_completion=True, waiter_delay=15, waiter_max_attempts=60, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]

基类: EcsBaseOperator

创建一个 AWS ECS 集群。

另请参阅

有关如何使用此算子的更多信息,请参阅指南:创建一个 AWS ECS 集群

参数:
  • cluster_name (str) – 您的集群名称。如果您不为集群指定名称,则将创建一个名为 default 的集群。

  • create_cluster_kwargs (dict | None) – 集群创建的额外参数。

  • wait_for_completion (bool) – 如果为 True,则等待集群创建完成。(默认值: True)

  • waiter_delay (int) – 两次尝试之间等待的时间间隔(秒),如果未设置,则使用默认的等待者 (waiter) 值。

  • waiter_max_attempts (int) – 最大尝试次数,如果未设置,则使用默认的等待者 (waiter) 值。

  • deferrable (bool) – 如果为 True,算子将异步等待作业完成。这意味着需要等待完成。此模式需要安装 aiobotocore 模块。(默认值: False)

template_fields: collections.abc.Sequence[str][source]
cluster_name[source]
create_cluster_kwargs[source]
wait_for_completion = True[source]
waiter_delay = 15[source]
waiter_max_attempts = 60[source]
deferrable = True[source]
execute(context)[source]

必须在子类中重写。

class airflow.providers.amazon.aws.operators.ecs.EcsDeleteClusterOperator(*, cluster_name, wait_for_completion=True, waiter_delay=15, waiter_max_attempts=60, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]

基类: EcsBaseOperator

删除一个 AWS ECS 集群。

另请参阅

有关如何使用此算子的更多信息,请参阅指南:删除一个 AWS ECS 集群

参数:
  • cluster_name (str) – 要删除的集群的简称或完整的 Amazon Resource Name (ARN)。

  • wait_for_completion (bool) – 如果为 True,则等待集群创建完成。(默认值: True)

  • waiter_delay (int) – 两次尝试之间等待的时间间隔(秒),如果未设置,则使用默认的等待者 (waiter) 值。

  • waiter_max_attempts (int) – 最大尝试次数,如果未设置,则使用默认的等待者 (waiter) 值。

  • deferrable (bool) – 如果为 True,算子将异步等待作业完成。这意味着需要等待完成。此模式需要安装 aiobotocore 模块。(默认值: False)

template_fields: collections.abc.Sequence[str] = ('cluster_name', 'wait_for_completion', 'deferrable')[source]
cluster_name[source]
wait_for_completion = True[source]
waiter_delay = 15[source]
waiter_max_attempts = 60[source]
deferrable = True[source]
execute(context)[source]

必须在子类中重写。

class airflow.providers.amazon.aws.operators.ecs.EcsDeregisterTaskDefinitionOperator(*, task_definition, **kwargs)[source]

基类: EcsBaseOperator

在 AWS ECS 上注销一个任务定义。

另请参阅

有关如何使用此算子的更多信息,请参阅指南:注销一个任务定义

参数:

task_definition (str) – 要注销的任务定义的族和修订版本 (family:revision) 或完整的 Amazon Resource Name (ARN)。如果您使用族名称,则必须指定修订版本。

template_fields: collections.abc.Sequence[str] = ('task_definition',)[source]
task_definition[source]
execute(context)[source]

必须在子类中重写。

class airflow.providers.amazon.aws.operators.ecs.EcsRegisterTaskDefinitionOperator(*, family, container_definitions, register_task_kwargs=None, **kwargs)[source]

基类: EcsBaseOperator

在 AWS ECS 上注册一个任务定义。

另请参阅

有关如何使用此算子的更多信息,请参阅指南:注册一个任务定义

参数:
  • family (str) – 要创建的任务定义的族名称。

  • container_definitions (list[dict]) – 一个包含容器定义的 JSON 格式列表,用于描述构成您的任务的不同容器。

  • register_task_kwargs (dict | None) – 注册任务定义的额外参数。

template_fields: collections.abc.Sequence[str] = ('family', 'container_definitions', 'register_task_kwargs')[source]
family[source]
container_definitions[source]
register_task_kwargs[source]
execute(context)[source]

必须在子类中重写。

class airflow.providers.amazon.aws.operators.ecs.EcsRunTaskOperator(*, task_definition, cluster, overrides, launch_type='EC2', capacity_provider_strategy=None, volume_configurations=None, group=None, placement_constraints=None, placement_strategy=None, platform_version=None, network_configuration=None, tags=None, awslogs_group=None, awslogs_region=None, awslogs_stream_prefix=None, awslogs_fetch_interval=timedelta(seconds=30), container_name=None, propagate_tags=None, quota_retry=None, reattach=False, number_logs_exception=10, wait_for_completion=True, waiter_delay=6, waiter_max_attempts=1000000, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]

基类: EcsBaseOperator

在 AWS ECS (弹性容器服务) 上执行一个任务。

另请参阅

有关如何使用此算子的更多信息,请参阅指南:运行一个任务

参数:
  • task_definition (str) – 弹性容器服务上的任务定义名称

  • cluster (str) – 弹性容器服务上的集群名称

  • overrides (dict) – boto3 将接收的相同参数(可模板化):https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.run_task

  • aws_conn_id – AWS 凭据/区域名称的连接 ID。如果为 None,将使用 boto3 凭据策略 (https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html)。

  • region – 要在 AWS Hook 中使用的区域名称。覆盖连接中的区域(如果提供)

  • launch_type (str) – 运行任务的启动类型(‘EC2’、‘EXTERNAL’ 或 ‘FARGATE’)

  • capacity_provider_strategy (list | None) – 用于任务的容量提供程序策略。指定 capacity_provider_strategy 时,将省略 launch_type 参数。如果未指定 capacity_provider_strategy 或 launch_type,则使用集群的默认容量提供程序策略。

  • volume_configurations (list | None) – 使用容量提供程序时使用的卷配置。卷名称必须与任务定义中的名称匹配。您可以配置大小、卷类型、IOPS、吞吐量等设置,详情请参阅 (https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_TaskManagedEBSVolumeConfiguration.html)

  • group (str | None) – 与任务关联的任务组名称

  • placement_constraints (list | None) – 用于任务的放置约束对象数组

  • placement_strategy (list | None) – 用于任务的放置策略对象数组

  • platform_version (str | None) – 任务运行所在的平台版本

  • network_configuration (dict | None) – 任务的网络配置

  • tags (dict | None) – 一个标签字典,格式为 {‘tagKey’: ‘tagValue’}。

  • awslogs_group (str | None) – 存储 ECS 容器日志的 CloudWatch 日志组。只有在作业完成后希望日志显示在 Airflow UI 中时才需要此参数。

  • awslogs_region (str | None) – 存储 CloudWatch 日志的区域。如果为 None,则与 region 参数相同。如果该参数也为 None,则使用基于连接设置的默认 AWS 区域。

  • awslogs_stream_prefix (str | None) – 用于 CloudWatch 日志的流前缀。这应与任务定义的日志配置中指定的前缀匹配。只有在作业完成后希望日志显示在 Airflow UI 中时才需要此参数。

  • awslogs_fetch_interval (datetime.timedelta) – ECS 任务日志抓取器在每次抓取 CloudWatch 日志之间应等待的时间间隔。如果 deferrable 设置为 True,则忽略此参数,而改用 waiter_delay。

  • container_name (str | None) – 要从中抓取日志的容器名称。如果未设置,则使用第一个容器。

  • quota_retry (dict | None) – 配置是否以及如何重试启动新的 ECS 任务,以处理瞬时错误。

  • reattach (bool) – 如果设置为 True,将检查任务实例先前启动的任务是否已在运行。如果已运行,算子将附加到该任务而不是启动一个新任务。这是为了避免在任务运行时 Airflow 和 ECS 之间的连接断开时(例如当 Airflow worker 重启时)重新启动一个新任务。

  • number_logs_exception (int) – 如果 ECS 任务停止,在 AirflowException 中返回的 CloudWatch 日志的最后行数(用于接收包含 ECS 中运行代码失败日志的 Airflow 警报)。

  • wait_for_completion (bool) – 如果为 True,则等待集群创建完成。(默认值: True)

  • waiter_delay (int) – 两次尝试之间等待的时间间隔(秒),如果未设置,则使用默认的等待者 (waiter) 值。

  • waiter_max_attempts (int) – 最大尝试次数,如果未设置,则使用默认的等待者 (waiter) 值。

  • deferrable (bool) – 如果为 True,算子将异步等待作业完成。这意味着需要等待完成。此模式需要安装 aiobotocore 模块。(默认值: False)

  • do_xcom_push – 如果为 True,算子将把 ECS 任务 ARN 推送到 XCom,键为 ‘ecs_task_arn’。此外,如果抓取了日志,最后一条日志消息将推送到 XCom,键为 ‘return_value’。(默认值: False)

ui_color = '#f0ede4'[source]
template_fields: collections.abc.Sequence[str] = ('task_definition', 'cluster', 'overrides', 'launch_type', 'capacity_provider_strategy',...[source]
template_fields_renderers[source]
task_definition[source]
cluster[source]
overrides[source]
launch_type = 'EC2'[source]
capacity_provider_strategy = None[source]
volume_configurations = None[source]
group = None[source]
placement_constraints = None[source]
placement_strategy = None[source]
platform_version = None[source]
network_configuration = None[source]
tags = None[source]
awslogs_group = None[source]
awslogs_stream_prefix = None[source]
awslogs_region = None[source]
awslogs_fetch_interval[source]
propagate_tags = None[source]
reattach = False[source]
number_logs_exception = 10[source]
arn: str | None = None[source]
container_name: str | None = None[source]
retry_args = None[source]
task_log_fetcher: airflow.providers.amazon.aws.utils.task_log_fetcher.AwsTaskLogFetcher | None = None[source]
wait_for_completion = True[source]
waiter_delay = 6[source]
waiter_max_attempts = 1000000[source]
deferrable = True[source]
execute(context)[source]

必须在子类中重写。

execute_complete(context, event=None)[source]
on_kill()[source]

覆盖此方法以在任务实例被终止时清理子进程。

在操作器中使用 threading、subprocess 或 multiprocessing 模块的任何部分都需要清理,否则会留下僵尸进程。

此条目有帮助吗?