Amazon Elastic Container Service (ECS)

Amazon Elastic Container Service (Amazon ECS) 是一项完全托管式的容器编排服务,可帮助您轻松部署、管理和扩展容器化应用程序。

Airflow 提供 Operators 来在 ECS 集群上运行 Task Definitions。

前提任务

要使用这些 Operators,您需要完成一些准备工作

通用参数

aws_conn_id

引用 Amazon Web Services 连接 ID。如果此参数设置为 None,则使用默认 boto3 行为而不进行连接查找。否则,使用 Connection 中存储的凭据。默认值:aws_default

region_name

AWS 区域名称。如果此参数设置为 None 或省略,则使用 AWS 连接额外参数 中的 region_name。否则,使用指定的值而不是连接值。默认值:None

verify

是否验证 SSL 证书。

  • False - 不验证 SSL 证书。

  • path/to/cert/bundle.pem - 要使用的 CA 证书捆绑包的文件名。如果您想使用与 botocore 使用的 CA 证书捆绑包不同的捆绑包,可以指定此参数。

如果此参数设置为 None 或省略,则使用 AWS 连接额外参数 中的 verify。否则,使用指定的值而不是连接值。默认值:None

botocore_config

提供的字典用于构造一个 botocore.config.Config。此配置可用于配置 避免节流异常、超时等。

示例,有关参数的更多详细信息,请参阅 botocore.config.Config
{
    "signature_version": "unsigned",
    "s3": {
        "us_east_1_regional_endpoint": True,
    },
    "retries": {
      "mode": "standard",
      "max_attempts": 10,
    },
    "connect_timeout": 300,
    "read_timeout": 300,
    "tcp_keepalive": True,
}

如果此参数设置为 None 或省略,则使用 AWS 连接额外参数 中的 config_kwargs。否则,使用指定的值而不是连接值。默认值:None

注意

指定一个空字典,{},将覆盖 botocore.config.Config 的连接配置。

Operators

创建 AWS ECS 集群

要创建 Amazon ECS 集群,您可以使用 EcsCreateClusterOperator

传递给 Create Cluster API 的所有可选参数应在 ‘create_cluster_kwargs’ 字典中传递。

tests/system/amazon/aws/example_ecs.py

create_cluster = EcsCreateClusterOperator(
    task_id="create_cluster",
    cluster_name=new_cluster_name,
)

删除 AWS ECS 集群

要删除 Amazon ECS 集群,您可以使用 EcsDeleteClusterOperator

tests/system/amazon/aws/example_ecs.py

delete_cluster = EcsDeleteClusterOperator(
    task_id="delete_cluster",
    cluster_name=new_cluster_name,
)

注册 Task Definition

要注册 Task Definition,您可以使用 EcsRegisterTaskDefinitionOperator

传递给 Register Task Definition API 的所有可选参数应在 ‘register_task_kwargs’ 字典中传递。

tests/system/amazon/aws/example_ecs.py

register_task = EcsRegisterTaskDefinitionOperator(
    task_id="register_task",
    family=family_name,
    container_definitions=[
        {
            "name": container_name,
            "image": "ubuntu",
            "workingDirectory": "/usr/bin",
            "entryPoint": ["sh", "-c"],
            "command": ["ls"],
            "logConfiguration": {
                "logDriver": "awslogs",
                "options": {
                    "awslogs-group": log_group_name,
                    "awslogs-region": aws_region,
                    "awslogs-create-group": "true",
                    "awslogs-stream-prefix": "ecs",
                },
            },
        },
    ],
    register_task_kwargs={
        "cpu": "256",
        "memory": "512",
        "networkMode": "awsvpc",
    },
)

注销 Task Definition

要注销 Task Definition,您可以使用 EcsDeregisterTaskDefinitionOperator

tests/system/amazon/aws/example_ecs.py

deregister_task = EcsDeregisterTaskDefinitionOperator(
    task_id="deregister_task",
    task_definition=register_task.output,
)

运行 Task Definition

要在 Amazon ECS 集群中运行 Task Definition,您可以使用 EcsRunTaskOperator

在使用此 Operator 之前,您需要已创建 ECS 集群和 Task Definition。Task Definition 包含您要运行的容器化应用程序的详细信息。

此 Operator 支持在无服务器 (FARGATE)、通过 EC2 或通过外部资源 (EXTERNAL) 的 ECS 集群中运行您的容器。此 Operator 需要配置的参数将取决于您想要使用的 launch_type

launch_type="EC2|FARGATE|EXTERNAL"
  • 如果您在 ECS 集群中使用 AWS Fargate 作为计算资源,请将参数 launch_type 设置为 FARGATE。使用 FARGATE 启动类型时,您需要提供 network_configuration 参数。

  • 如果您在 ECS 集群中使用 EC2 作为计算资源,请将参数设置为 EC2。

  • 如果您在 ECS 集群中集成了外部资源(例如使用 ECS Anywhere),并希望在这些外部资源上运行您的容器,请将参数设置为 EXTERNAL。

tests/system/amazon/aws/example_ecs.py

run_task = EcsRunTaskOperator(
    task_id="run_task",
    cluster=existing_cluster_name,
    task_definition=register_task.output,
    overrides={
        "containerOverrides": [
            {
                "name": container_name,
                "command": ["echo hello world"],
            },
        ],
    },
    network_configuration={"awsvpcConfiguration": {"subnets": existing_cluster_subnets}},
    awslogs_group=log_group_name,
    awslogs_region=aws_region,
    awslogs_stream_prefix=f"ecs/{container_name}",
)

tests/system/amazon/aws/example_ecs_fargate.py

hello_world = EcsRunTaskOperator(
    task_id="hello_world",
    cluster=cluster_name,
    task_definition=task_definition_name,
    launch_type="FARGATE",
    overrides={
        "containerOverrides": [
            {
                "name": container_name,
                "command": ["echo", "hello", "world"],
            },
        ],
    },
    network_configuration={
        "awsvpcConfiguration": {
            "subnets": test_context[SUBNETS_KEY],
            "securityGroups": test_context[SECURITY_GROUPS_KEY],
            "assignPublicIp": "ENABLED",
        },
    },
)

将日志流式传输到 AWS CloudWatch

要将日志流式传输到 AWS CloudWatch,您需要定义以下参数。使用上面的示例,我们将添加这些额外参数以启用日志记录到 CloudWatch。您需要确保您具有适当的权限级别(参见下一节)。

tests/system/amazon/aws/example_ecs.py

awslogs_group=log_group_name,
awslogs_region=aws_region,
awslogs_stream_prefix=f"ecs/{container_name}",

IAM 权限

您需要确保您拥有以下 IAM 权限才能通过 EcsRunTaskOperator 运行任务

{
    "Effect": "Allow",
    "Action": [
        "ecs:RunTask",
        "ecs:DescribeTasks",
    ]
    "Resource": [ "arn:aws:ecs:{aws region}:{aws account number}:task_definition/{task definition family}" ]
},
{
    "Effect": "Allow",
    "Action": [
        "iam:PassRole"
    ]
    "Resource": [ "arn:aws:iam::{aws account number}:role/{task execution role name}" ]
},
{
    "Effect": "Allow",
    "Action": [
      "ecs:DescribeTasks",
    ],
    "Resource": [ "arn:aws:ecs:{aws region}:{aws account number}:task/{ecs cluster name}/*" ]
}

如果您使用 “reattach=True”(默认值为 False),您需要添加更多权限。您需要将以下额外 Actions 添加到 IAM policy 中。

"ecs:DescribeTaskDefinition",
"ecs:ListTasks"

CloudWatch 权限

如果您计划将 Apache Airflow 日志流式传输到 AWS CloudWatch 中,您需要确保已配置适当的权限集。

iam.PolicyStatement(
    actions=[
        "logs:CreateLogStream",
        "logs:CreateLogGroup",
        "logs:PutLogEvents",
        "logs:GetLogEvents",
        "logs:GetLogRecord",
        "logs:GetLogGroupFields",
        "logs:GetQueryResults"
    ],
    effect=iam.Effect.ALLOW,
    resources=[
        "arn:aws:logs:{aws region}:{aws account number}:log-group:{aws-log-group-name}:log-stream:{aws-log-stream-name}/\*"
        ]
)

Sensors

AWS ECS 集群状态 Sensor

要轮询集群状态直到其达到最终状态,您可以使用 EcsClusterStateSensor

默认为 EcsClusterStates.ACTIVE 作为成功状态,没有失败状态,两者都可以通过提供的值覆盖。如果提供了失败状态且在达到目标状态之前达到该状态,则抛出带有失败原因的 AirflowException。

tests/system/amazon/aws/example_ecs.py

await_cluster = EcsClusterStateSensor(
    task_id="await_cluster",
    cluster_name=new_cluster_name,
)

AWS ECS Task Definition 状态 Sensor

要轮询 Task Definition 状态直到其达到最终状态,您可以使用 EcsTaskDefinitionStateSensor

有效状态为 EcsTaskDefinitionStates.ACTIVE 或 EcsTaskDefinitionStates.INACTIVE。默认为 EcsTaskDefinitionStates.ACTIVE 作为成功状态,但也接受一个参数来更改。如果在达到目标状态之前达到失败状态,则抛出带有失败原因的 AirflowException。

tests/system/amazon/aws/example_ecs.py

await_task_definition = EcsTaskDefinitionStateSensor(
    task_id="await_task_definition",
    task_definition=register_task.output,
)

AWS ECS Task 状态 Sensor

要轮询 Task 状态直到其达到最终状态,您可以使用 EcsTaskStateSensor

默认为 EcsTaskStates.RUNNING 作为成功状态,没有失败状态,两者都可以通过提供的值覆盖。如果提供了失败状态且在达到目标状态之前达到该状态,则抛出带有失败原因的 AirflowException。

tests/system/amazon/aws/example_ecs_fargate.py

# By default, EcsTaskStateSensor waits until the task has started, but the
# demo task runs so fast that the sensor misses it.  This sensor instead
# demonstrates how to wait until the ECS Task has completed by providing
# the target_state and failure_states parameters.
await_task_finish = EcsTaskStateSensor(
    task_id="await_task_finish",
    cluster=cluster_name,
    task=hello_world.output["ecs_task_arn"],
    target_state=EcsTaskStates.STOPPED,
    failure_states={EcsTaskStates.NONE},
)

参考资料

本条目有帮助吗?