Amazon Elastic Container Service (ECS)¶
Amazon Elastic Container Service (Amazon ECS) 是一项完全托管式的容器编排服务,可帮助您轻松部署、管理和扩展容器化应用程序。
Airflow 提供 Operators 来在 ECS 集群上运行 Task Definitions。
前提任务¶
要使用这些 Operators,您需要完成一些准备工作
通过 pip 安装 API 库。
pip install 'apache-airflow[amazon]'详细信息请参阅 Airflow® 安装
设置连接.
通用参数¶
- aws_conn_id
引用 Amazon Web Services 连接 ID。如果此参数设置为
None
,则使用默认 boto3 行为而不进行连接查找。否则,使用 Connection 中存储的凭据。默认值:aws_default
- region_name
AWS 区域名称。如果此参数设置为
None
或省略,则使用 AWS 连接额外参数 中的 region_name。否则,使用指定的值而不是连接值。默认值:None
- verify
是否验证 SSL 证书。
False
- 不验证 SSL 证书。path/to/cert/bundle.pem - 要使用的 CA 证书捆绑包的文件名。如果您想使用与 botocore 使用的 CA 证书捆绑包不同的捆绑包,可以指定此参数。
如果此参数设置为
None
或省略,则使用 AWS 连接额外参数 中的 verify。否则,使用指定的值而不是连接值。默认值:None
- botocore_config
提供的字典用于构造一个 botocore.config.Config。此配置可用于配置 避免节流异常、超时等。
示例,有关参数的更多详细信息,请参阅 botocore.config.Config¶{ "signature_version": "unsigned", "s3": { "us_east_1_regional_endpoint": True, }, "retries": { "mode": "standard", "max_attempts": 10, }, "connect_timeout": 300, "read_timeout": 300, "tcp_keepalive": True, }
如果此参数设置为
None
或省略,则使用 AWS 连接额外参数 中的 config_kwargs。否则,使用指定的值而不是连接值。默认值:None
注意
指定一个空字典,
{}
,将覆盖 botocore.config.Config 的连接配置。
Operators¶
创建 AWS ECS 集群¶
要创建 Amazon ECS 集群,您可以使用 EcsCreateClusterOperator
。
传递给 Create Cluster API 的所有可选参数应在 ‘create_cluster_kwargs’ 字典中传递。
tests/system/amazon/aws/example_ecs.py
create_cluster = EcsCreateClusterOperator(
task_id="create_cluster",
cluster_name=new_cluster_name,
)
删除 AWS ECS 集群¶
要删除 Amazon ECS 集群,您可以使用 EcsDeleteClusterOperator
。
tests/system/amazon/aws/example_ecs.py
delete_cluster = EcsDeleteClusterOperator(
task_id="delete_cluster",
cluster_name=new_cluster_name,
)
注册 Task Definition¶
要注册 Task Definition,您可以使用 EcsRegisterTaskDefinitionOperator
。
传递给 Register Task Definition API 的所有可选参数应在 ‘register_task_kwargs’ 字典中传递。
tests/system/amazon/aws/example_ecs.py
register_task = EcsRegisterTaskDefinitionOperator(
task_id="register_task",
family=family_name,
container_definitions=[
{
"name": container_name,
"image": "ubuntu",
"workingDirectory": "/usr/bin",
"entryPoint": ["sh", "-c"],
"command": ["ls"],
"logConfiguration": {
"logDriver": "awslogs",
"options": {
"awslogs-group": log_group_name,
"awslogs-region": aws_region,
"awslogs-create-group": "true",
"awslogs-stream-prefix": "ecs",
},
},
},
],
register_task_kwargs={
"cpu": "256",
"memory": "512",
"networkMode": "awsvpc",
},
)
注销 Task Definition¶
要注销 Task Definition,您可以使用 EcsDeregisterTaskDefinitionOperator
。
tests/system/amazon/aws/example_ecs.py
deregister_task = EcsDeregisterTaskDefinitionOperator(
task_id="deregister_task",
task_definition=register_task.output,
)
运行 Task Definition¶
要在 Amazon ECS 集群中运行 Task Definition,您可以使用 EcsRunTaskOperator
。
在使用此 Operator 之前,您需要已创建 ECS 集群和 Task Definition。Task Definition 包含您要运行的容器化应用程序的详细信息。
此 Operator 支持在无服务器 (FARGATE)、通过 EC2 或通过外部资源 (EXTERNAL) 的 ECS 集群中运行您的容器。此 Operator 需要配置的参数将取决于您想要使用的 launch_type
。
launch_type="EC2|FARGATE|EXTERNAL"
如果您在 ECS 集群中使用 AWS Fargate 作为计算资源,请将参数
launch_type
设置为 FARGATE。使用 FARGATE 启动类型时,您需要提供network_configuration
参数。如果您在 ECS 集群中使用 EC2 作为计算资源,请将参数设置为 EC2。
如果您在 ECS 集群中集成了外部资源(例如使用 ECS Anywhere),并希望在这些外部资源上运行您的容器,请将参数设置为 EXTERNAL。
tests/system/amazon/aws/example_ecs.py
run_task = EcsRunTaskOperator(
task_id="run_task",
cluster=existing_cluster_name,
task_definition=register_task.output,
overrides={
"containerOverrides": [
{
"name": container_name,
"command": ["echo hello world"],
},
],
},
network_configuration={"awsvpcConfiguration": {"subnets": existing_cluster_subnets}},
awslogs_group=log_group_name,
awslogs_region=aws_region,
awslogs_stream_prefix=f"ecs/{container_name}",
)
tests/system/amazon/aws/example_ecs_fargate.py
hello_world = EcsRunTaskOperator(
task_id="hello_world",
cluster=cluster_name,
task_definition=task_definition_name,
launch_type="FARGATE",
overrides={
"containerOverrides": [
{
"name": container_name,
"command": ["echo", "hello", "world"],
},
],
},
network_configuration={
"awsvpcConfiguration": {
"subnets": test_context[SUBNETS_KEY],
"securityGroups": test_context[SECURITY_GROUPS_KEY],
"assignPublicIp": "ENABLED",
},
},
)
将日志流式传输到 AWS CloudWatch¶
要将日志流式传输到 AWS CloudWatch,您需要定义以下参数。使用上面的示例,我们将添加这些额外参数以启用日志记录到 CloudWatch。您需要确保您具有适当的权限级别(参见下一节)。
tests/system/amazon/aws/example_ecs.py
awslogs_group=log_group_name,
awslogs_region=aws_region,
awslogs_stream_prefix=f"ecs/{container_name}",
IAM 权限¶
您需要确保您拥有以下 IAM 权限才能通过 EcsRunTaskOperator 运行任务
{
"Effect": "Allow",
"Action": [
"ecs:RunTask",
"ecs:DescribeTasks",
]
"Resource": [ "arn:aws:ecs:{aws region}:{aws account number}:task_definition/{task definition family}" ]
},
{
"Effect": "Allow",
"Action": [
"iam:PassRole"
]
"Resource": [ "arn:aws:iam::{aws account number}:role/{task execution role name}" ]
},
{
"Effect": "Allow",
"Action": [
"ecs:DescribeTasks",
],
"Resource": [ "arn:aws:ecs:{aws region}:{aws account number}:task/{ecs cluster name}/*" ]
}
如果您使用 “reattach=True”(默认值为 False),您需要添加更多权限。您需要将以下额外 Actions 添加到 IAM policy 中。
"ecs:DescribeTaskDefinition",
"ecs:ListTasks"
CloudWatch 权限
如果您计划将 Apache Airflow 日志流式传输到 AWS CloudWatch 中,您需要确保已配置适当的权限集。
iam.PolicyStatement(
actions=[
"logs:CreateLogStream",
"logs:CreateLogGroup",
"logs:PutLogEvents",
"logs:GetLogEvents",
"logs:GetLogRecord",
"logs:GetLogGroupFields",
"logs:GetQueryResults"
],
effect=iam.Effect.ALLOW,
resources=[
"arn:aws:logs:{aws region}:{aws account number}:log-group:{aws-log-group-name}:log-stream:{aws-log-stream-name}/\*"
]
)
Sensors¶
AWS ECS 集群状态 Sensor¶
要轮询集群状态直到其达到最终状态,您可以使用 EcsClusterStateSensor
。
默认为 EcsClusterStates.ACTIVE 作为成功状态,没有失败状态,两者都可以通过提供的值覆盖。如果提供了失败状态且在达到目标状态之前达到该状态,则抛出带有失败原因的 AirflowException。
tests/system/amazon/aws/example_ecs.py
await_cluster = EcsClusterStateSensor(
task_id="await_cluster",
cluster_name=new_cluster_name,
)
AWS ECS Task Definition 状态 Sensor¶
要轮询 Task Definition 状态直到其达到最终状态,您可以使用 EcsTaskDefinitionStateSensor
。
有效状态为 EcsTaskDefinitionStates.ACTIVE 或 EcsTaskDefinitionStates.INACTIVE。默认为 EcsTaskDefinitionStates.ACTIVE 作为成功状态,但也接受一个参数来更改。如果在达到目标状态之前达到失败状态,则抛出带有失败原因的 AirflowException。
tests/system/amazon/aws/example_ecs.py
await_task_definition = EcsTaskDefinitionStateSensor(
task_id="await_task_definition",
task_definition=register_task.output,
)
AWS ECS Task 状态 Sensor¶
要轮询 Task 状态直到其达到最终状态,您可以使用 EcsTaskStateSensor
。
默认为 EcsTaskStates.RUNNING 作为成功状态,没有失败状态,两者都可以通过提供的值覆盖。如果提供了失败状态且在达到目标状态之前达到该状态,则抛出带有失败原因的 AirflowException。
tests/system/amazon/aws/example_ecs_fargate.py
# By default, EcsTaskStateSensor waits until the task has started, but the
# demo task runs so fast that the sensor misses it. This sensor instead
# demonstrates how to wait until the ECS Task has completed by providing
# the target_state and failure_states parameters.
await_task_finish = EcsTaskStateSensor(
task_id="await_task_finish",
cluster=cluster_name,
task=hello_world.output["ecs_task_arn"],
target_state=EcsTaskStates.STOPPED,
failure_states={EcsTaskStates.NONE},
)