Amazon Elastic Container Service (ECS)¶
Amazon Elastic Container Service (Amazon ECS) 是一种完全托管的容器编排服务,可让您轻松部署、管理和扩展容器化应用程序。
Airflow 提供操作符以在 ECS 集群上运行任务定义。
先决条件任务¶
要使用这些操作符,您必须执行以下几项操作
通过 pip 安装 API 库。
pip install 'apache-airflow[amazon]'详细信息请参阅Airflow® 的安装
设置连接.
通用参数¶
- aws_conn_id
引用 Amazon Web Services 连接 ID。 如果此参数设置为
None
,则使用默认 boto3 行为,而不进行连接查找。否则,请使用连接中存储的凭据。默认值:aws_default
- region_name
AWS 区域名称。如果此参数设置为
None
或省略,则将使用来自 AWS 连接额外参数的 region_name。否则,请使用指定的值而不是连接值。默认值:None
- verify
是否验证 SSL 证书。
False
- 不验证 SSL 证书。path/to/cert/bundle.pem - 要使用的 CA 证书捆绑包的文件名。 如果您想使用与 botocore 使用的 CA 证书捆绑包不同的 CA 证书捆绑包,则可以指定此参数。
如果此参数设置为
None
或省略,则将使用来自 AWS 连接额外参数 的 verify。否则,请使用指定的值而不是连接值。默认值:None
- botocore_config
提供的字典用于构造 botocore.config.Config。 此配置可用于配置 避免节流异常、超时等。
{ "signature_version": "unsigned", "s3": { "us_east_1_regional_endpoint": True, }, "retries": { "mode": "standard", "max_attempts": 10, }, "connect_timeout": 300, "read_timeout": 300, "tcp_keepalive": True, }
如果此参数设置为
None
或省略,则将使用来自 AWS 连接额外参数 的 config_kwargs。否则,请使用指定的值而不是连接值。默认值:None
注意
指定空字典
{}
将覆盖 botocore.config.Config 的连接配置
操作符¶
创建 AWS ECS 集群¶
要创建 Amazon ECS 集群,可以使用 EcsCreateClusterOperator
。
要传递到 Create Cluster API 的所有可选参数都应在 'create_cluster_kwargs' 字典中传递。
tests/system/amazon/aws/example_ecs.py
create_cluster = EcsCreateClusterOperator(
task_id="create_cluster",
cluster_name=new_cluster_name,
)
删除 AWS ECS 集群¶
要删除 Amazon ECS 集群,可以使用 EcsDeleteClusterOperator
。
tests/system/amazon/aws/example_ecs.py
delete_cluster = EcsDeleteClusterOperator(
task_id="delete_cluster",
cluster_name=new_cluster_name,
)
注册任务定义¶
要注册任务定义,可以使用 EcsRegisterTaskDefinitionOperator
。
要传递到 Register Task Definition API 的所有可选参数都应在 'register_task_kwargs' 字典中传递。
tests/system/amazon/aws/example_ecs.py
register_task = EcsRegisterTaskDefinitionOperator(
task_id="register_task",
family=family_name,
container_definitions=[
{
"name": container_name,
"image": "ubuntu",
"workingDirectory": "/usr/bin",
"entryPoint": ["sh", "-c"],
"command": ["ls"],
"logConfiguration": {
"logDriver": "awslogs",
"options": {
"awslogs-group": log_group_name,
"awslogs-region": aws_region,
"awslogs-create-group": "true",
"awslogs-stream-prefix": "ecs",
},
},
},
],
register_task_kwargs={
"cpu": "256",
"memory": "512",
"networkMode": "awsvpc",
},
)
注销任务定义¶
要注销任务定义,可以使用 EcsDeregisterTaskDefinitionOperator
。
tests/system/amazon/aws/example_ecs.py
deregister_task = EcsDeregisterTaskDefinitionOperator(
task_id="deregister_task",
task_definition=register_task.output,
)
运行任务定义¶
要在 Amazon ECS 集群中运行定义的任务定义,可以使用 EcsRunTaskOperator
。
您需要先创建 ECS 集群并创建任务定义,然后才能使用此操作符。任务定义包含要运行的容器化应用程序的详细信息。
此操作符支持在无服务器 (FARGATE)、通过 EC2 或通过外部资源 (EXTERNAL) 的 ECS 集群中运行您的容器。您需要为此操作符配置的参数将取决于您要使用的 launch_type
。
launch_type="EC2|FARGATE|EXTERNAL"
如果您在 ECS 集群中使用 AWS Fargate 作为计算资源,请将参数
launch_type
设置为 FARGATE。当使用 FARGATE 的启动类型时,您需要提供network_configuration
参数。如果您在 ECS 集群中使用 EC2 作为计算资源,请将参数设置为 EC2。
如果您的 ECS 集群中集成了外部资源(例如使用 ECS Anywhere),并且希望在这些外部资源上运行容器,请将参数设置为 EXTERNAL。
tests/system/amazon/aws/example_ecs.py
run_task = EcsRunTaskOperator(
task_id="run_task",
cluster=existing_cluster_name,
task_definition=register_task.output,
overrides={
"containerOverrides": [
{
"name": container_name,
"command": ["echo hello world"],
},
],
},
network_configuration={"awsvpcConfiguration": {"subnets": existing_cluster_subnets}},
awslogs_group=log_group_name,
awslogs_region=aws_region,
awslogs_stream_prefix=f"ecs/{container_name}",
)
tests/system/amazon/aws/example_ecs_fargate.py
hello_world = EcsRunTaskOperator(
task_id="hello_world",
cluster=cluster_name,
task_definition=task_definition_name,
launch_type="FARGATE",
overrides={
"containerOverrides": [
{
"name": container_name,
"command": ["echo", "hello", "world"],
},
],
},
network_configuration={
"awsvpcConfiguration": {
"subnets": test_context[SUBNETS_KEY],
"securityGroups": test_context[SECURITY_GROUPS_KEY],
"assignPublicIp": "ENABLED",
},
},
)
将日志流式传输到 AWS CloudWatch¶
要将日志流式传输到 AWS CloudWatch,您需要定义以下参数。使用上面的示例,我们将添加这些附加参数以启用 CloudWatch 日志记录。您需要确保您具有适当的权限级别(请参阅下一节)。
tests/system/amazon/aws/example_ecs.py
awslogs_group=log_group_name,
awslogs_region=aws_region,
awslogs_stream_prefix=f"ecs/{container_name}",
IAM 权限¶
您需要确保您具有以下 IAM 权限才能通过 EcsRunTaskOperator 运行任务
{
"Effect": "Allow",
"Action": [
"ecs:RunTask",
"ecs:DescribeTasks",
]
"Resource": [ "arn:aws:ecs:{aws region}:{aws account number}:task_definition/{task definition family}" ]
},
{
"Effect": "Allow",
"Action": [
"iam:PassRole"
]
"Resource": [ "arn:aws:iam::{aws account number}:role/{task execution role name}" ]
},
{
"Effect": "Allow",
"Action": [
"ecs:DescribeTasks",
],
"Resource": [ "arn:aws:ecs:{aws region}:{aws account number}:task/{ecs cluster name}/*" ]
}
如果您使用“reattach=True”(默认值为 False),则需要添加更多权限。您需要向 IAM 策略添加以下附加操作。
"ecs:DescribeTaskDefinition",
"ecs:ListTasks"
CloudWatch 权限
如果您计划将 Apache Airflow 日志流式传输到 AWS CloudWatch,则需要确保已配置适当的权限集。
iam.PolicyStatement(
actions=[
"logs:CreateLogStream",
"logs:CreateLogGroup",
"logs:PutLogEvents",
"logs:GetLogEvents",
"logs:GetLogRecord",
"logs:GetLogGroupFields",
"logs:GetQueryResults"
],
effect=iam.Effect.ALLOW,
resources=[
"arn:aws:logs:{aws region}:{aws account number}:log-group:{aws-log-group-name}:log-stream:{aws-log-stream-name}/\*"
]
)
传感器¶
AWS ECS 集群状态传感器¶
要轮询集群状态直到它达到终端状态,可以使用 EcsClusterStateSensor
。
默认为 EcsClusterStates.ACTIVE 作为成功状态,没有失败状态,两者都可以使用提供的值覆盖。 如果提供了失败状态并且该状态在达到目标状态之前达到,则会引发带有失败原因的 AirflowException。
tests/system/amazon/aws/example_ecs.py
await_cluster = EcsClusterStateSensor(
task_id="await_cluster",
cluster_name=new_cluster_name,
)
AWS ECS 任务定义状态传感器¶
要轮询任务定义状态直到它达到终端状态,可以使用 EcsTaskDefinitionStateSensor
。
有效状态为 EcsTaskDefinitionStates.ACTIVE 或 EcsTaskDefinitionStates.INACTIVE。 默认为 EcsTaskDefinitionStates.ACTIVE 作为成功状态,但接受更改该状态的参数。 如果在达到目标状态之前达到失败状态,则会引发带有失败原因的 AirflowException。
tests/system/amazon/aws/example_ecs.py
await_task_definition = EcsTaskDefinitionStateSensor(
task_id="await_task_definition",
task_definition=register_task.output,
)
AWS ECS 任务状态传感器¶
要轮询任务状态直到其达到终端状态,您可以使用 EcsTaskStateSensor
。
默认情况下,成功状态为 EcsTaskStates.RUNNING,没有失败状态,这两个都可以通过提供的值进行覆盖。如果提供了失败状态,并且在达到目标状态之前达到了该状态,则会引发带有失败原因的 AirflowException。
tests/system/amazon/aws/example_ecs_fargate.py
# By default, EcsTaskStateSensor waits until the task has started, but the
# demo task runs so fast that the sensor misses it. This sensor instead
# demonstrates how to wait until the ECS Task has completed by providing
# the target_state and failure_states parameters.
await_task_finish = EcsTaskStateSensor(
task_id="await_task_finish",
cluster=cluster_name,
task=hello_world.output["ecs_task_arn"],
target_state=EcsTaskStates.STOPPED,
failure_states={EcsTaskStates.NONE},
)