Amazon Elastic Kubernetes Service (EKS)¶
Amazon Elastic Kubernetes Service (Amazon EKS) 是一项托管服务,可让您轻松在 AWS 上运行 Kubernetes,而无需自行建立或维护 Kubernetes 控制平面。Kubernetes 是一个开源系统,用于自动化容器化应用的部署、扩缩和管理。
Airflow 提供了用于创建 EKS 集群和计算基础设施以及与其交互的 Operator。
前置任务¶
要使用这些 Operator,您必须执行以下几项操作
使用 AWS Console 或 AWS CLI 创建必要的资源。
通过 pip 安装 API 库。
pip install 'apache-airflow[amazon]'详细信息请参阅 安装 Airflow®
设置连接.
通用参数¶
- aws_conn_id
引用 Amazon Web Services 连接 ID。如果此参数设置为
None
,则使用默认的 boto3 行为,无需查找连接。否则,使用 Connection 中存储的凭据。默认值:aws_default
- region_name
AWS 区域名称。如果此参数设置为
None
或省略,则使用 AWS 连接额外参数 中的 region_name。否则,使用指定的值代替连接中的值。默认值:None
- verify
是否验证 SSL 证书。
False
- 不验证 SSL 证书。path/to/cert/bundle.pem - 要使用的 CA 证书捆绑包的文件名。如果您想使用与 botocore 使用的 CA 证书捆绑包不同的捆绑包,可以指定此参数。
如果此参数设置为
None
或省略,则使用 AWS 连接额外参数 中的 verify。否则,使用指定的值代替连接中的值。默认值:None
- botocore_config
提供的字典用于构造一个 botocore.config.Config。此配置可用于配置 避免限流异常、超时等。
示例,有关参数的更多详细信息,请参阅 botocore.config.Config¶{ "signature_version": "unsigned", "s3": { "us_east_1_regional_endpoint": True, }, "retries": { "mode": "standard", "max_attempts": 10, }, "connect_timeout": 300, "read_timeout": 300, "tcp_keepalive": True, }
如果此参数设置为
None
或省略,则使用 AWS 连接额外参数 中的 config_kwargs。否则,使用指定的值代替连接中的值。默认值:None
注意
指定一个空字典,
{}
,将覆盖 botocore.config.Config 的连接配置
Operator¶
创建 Amazon EKS 集群¶
要创建 Amazon EKS 集群,您可以使用 EksCreateClusterOperator
。
- 注意:需要具有以下权限的 AWS IAM 角色
eks.amazonaws.com
必须添加到信任关系中AmazonEKSClusterPolicy
IAM Policy 必须附加
tests/system/amazon/aws/example_eks_with_nodegroups.py
# Create an Amazon EKS Cluster control plane without attaching compute service.
create_cluster = EksCreateClusterOperator(
task_id="create_cluster",
cluster_name=cluster_name,
cluster_role_arn=test_context[ROLE_ARN_KEY],
resources_vpc_config={"subnetIds": test_context[SUBNETS_KEY]},
compute=None,
)
一步创建 Amazon EKS 集群和节点组¶
要一步创建 Amazon EKS 集群和 EKS 托管节点组,您可以使用 EksCreateClusterOperator
。
- 注意:需要具有以下权限的 AWS IAM 角色
ec2.amazon.aws.com
必须在信任关系中eks.amazonaws.com
必须添加到信任关系中AmazonEC2ContainerRegistryReadOnly
IAM Policy 必须附加AmazonEKSClusterPolicy
IAM Policy 必须附加AmazonEKSWorkerNodePolicy
IAM Policy 必须附加
tests/system/amazon/aws/example_eks_with_nodegroup_in_one_step.py
# Create an Amazon EKS cluster control plane and an EKS nodegroup compute platform in one step.
create_cluster_and_nodegroup = EksCreateClusterOperator(
task_id="create_cluster_and_nodegroup",
cluster_name=cluster_name,
nodegroup_name=nodegroup_name,
cluster_role_arn=test_context[ROLE_ARN_KEY],
# Opting to use the same ARN for the cluster and the nodegroup here,
# but a different ARN could be configured and passed if desired.
nodegroup_role_arn=test_context[ROLE_ARN_KEY],
resources_vpc_config={"subnetIds": test_context[SUBNETS_KEY]},
# ``compute='nodegroup'`` is the default, explicitly set here for demo purposes.
compute="nodegroup",
# The launch template enforces IMDSv2 and is required for internal
# compliance when running these system tests on AWS infrastructure.
create_nodegroup_kwargs={"launchTemplate": {"name": launch_template_name}},
)
一步创建 Amazon EKS 集群和 AWS Fargate 配置文件¶
要一步创建 Amazon EKS 集群和 AWS Fargate 配置文件,您可以使用 EksCreateClusterOperator
。您还可以通过将 deferrable
参数设置为 True
,在可延迟模式下运行此 Operator。
- 注意:需要具有以下权限的 AWS IAM 角色
ec2.amazon.aws.com
必须在信任关系中eks.amazonaws.com
必须添加到信任关系中AmazonEC2ContainerRegistryReadOnly
IAM Policy 必须附加AmazonEKSClusterPolicy
IAM Policy 必须附加AmazonEKSWorkerNodePolicy
IAM Policy 必须附加
tests/system/amazon/aws/example_eks_with_fargate_in_one_step.py
# Create an Amazon EKS cluster control plane and an AWS Fargate compute platform in one step.
create_cluster_and_fargate_profile = EksCreateClusterOperator(
task_id="create_eks_cluster_and_fargate_profile",
cluster_name=cluster_name,
cluster_role_arn=cluster_role_arn,
resources_vpc_config={
"subnetIds": subnets,
"endpointPublicAccess": True,
"endpointPrivateAccess": False,
},
compute="fargate",
fargate_profile_name=fargate_profile_name,
# Opting to use the same ARN for the cluster and the pod here,
# but a different ARN could be configured and passed if desired.
fargate_pod_execution_role_arn=fargate_pod_role_arn,
)
删除 Amazon EKS 集群¶
要删除现有的 Amazon EKS 集群,您可以使用 EksDeleteClusterOperator
。您还可以通过将 deferrable
参数设置为 True
,在可延迟模式下运行此 Operator。
tests/system/amazon/aws/example_eks_with_nodegroups.py
delete_cluster = EksDeleteClusterOperator(
task_id="delete_cluster",
cluster_name=cluster_name,
)
- 注意:如果集群附加了任何资源,例如 Amazon EKS 节点组或 AWS
Fargate 配置文件,则无法删除集群。使用
force
参数将首先尝试删除任何附加的资源。
tests/system/amazon/aws/example_eks_with_nodegroup_in_one_step.py
# An Amazon EKS cluster can not be deleted with attached resources such as nodegroups or Fargate profiles.
# Setting the `force` to `True` will delete any attached resources before deleting the cluster.
delete_nodegroup_and_cluster = EksDeleteClusterOperator(
task_id="delete_nodegroup_and_cluster",
cluster_name=cluster_name,
force_delete_compute=True,
)
创建 Amazon EKS 托管节点组¶
要创建 Amazon EKS 托管节点组,您可以使用 EksCreateNodegroupOperator
。您还可以通过将 deferrable
参数设置为 True
,在可延迟模式下运行此 Operator。
- 注意:需要具有以下权限的 AWS IAM 角色
ec2.amazon.aws.com
必须在信任关系中AmazonEC2ContainerRegistryReadOnly
IAM Policy 必须附加AmazonEKSWorkerNodePolicy
IAM Policy 必须附加
tests/system/amazon/aws/example_eks_with_nodegroups.py
create_nodegroup = EksCreateNodegroupOperator(
task_id="create_nodegroup",
cluster_name=cluster_name,
nodegroup_name=nodegroup_name,
nodegroup_subnets=test_context[SUBNETS_KEY],
nodegroup_role_arn=test_context[ROLE_ARN_KEY],
)
删除 Amazon EKS 托管节点组¶
要删除现有的 Amazon EKS 托管节点组,您可以使用 EksDeleteNodegroupOperator
。您还可以通过将 deferrable
参数设置为 True
,在可延迟模式下运行此 Operator。
tests/system/amazon/aws/example_eks_with_nodegroups.py
delete_nodegroup = EksDeleteNodegroupOperator(
task_id="delete_nodegroup",
cluster_name=cluster_name,
nodegroup_name=nodegroup_name,
)
创建 AWS Fargate 配置文件¶
要创建 AWS Fargate 配置文件,您可以使用 EksCreateFargateProfileOperator
。
- 注意:需要具有以下权限的 AWS IAM 角色
ec2.amazon.aws.com
必须在信任关系中AmazonEC2ContainerRegistryReadOnly
IAM Policy 必须附加AmazonEKSWorkerNodePolicy
IAM Policy 必须附加
tests/system/amazon/aws/example_eks_with_fargate_profile.py
create_fargate_profile = EksCreateFargateProfileOperator(
task_id="create_eks_fargate_profile",
cluster_name=cluster_name,
pod_execution_role_arn=fargate_pod_role_arn,
fargate_profile_name=fargate_profile_name,
selectors=SELECTORS,
)
删除 AWS Fargate 配置文件¶
要删除现有的 AWS Fargate 配置文件,您可以使用 EksDeleteFargateProfileOperator
。
tests/system/amazon/aws/example_eks_with_fargate_profile.py
delete_fargate_profile = EksDeleteFargateProfileOperator(
task_id="delete_eks_fargate_profile",
cluster_name=cluster_name,
fargate_profile_name=fargate_profile_name,
)
在 Amazon EKS 集群上执行任务¶
要在现有的 Amazon EKS 集群上运行 Pod,您可以使用 EksPodOperator
。
注意:需要具有底层计算基础设施的 Amazon EKS 集群。
tests/system/amazon/aws/example_eks_with_nodegroups.py
start_pod = EksPodOperator(
task_id="start_pod",
pod_name="test_pod",
cluster_name=cluster_name,
image="amazon/aws-cli:latest",
cmds=["sh", "-c", "echo Test Airflow; date"],
labels={"demo": "hello_world"},
get_logs=True,
on_finish_action="keep_pod",
)
Sensor¶
等待 Amazon EKS 集群状态¶
要检查 Amazon EKS 集群的状态,直到其达到目标状态或其他终端状态,您可以使用 EksClusterStateSensor
。
tests/system/amazon/aws/example_eks_with_nodegroups.py
await_create_cluster = EksClusterStateSensor(
task_id="await_create_cluster",
cluster_name=cluster_name,
target_state=ClusterStates.ACTIVE,
)
等待 Amazon EKS 托管节点组状态¶
要检查 Amazon EKS 托管节点组的状态,直到其达到目标状态或其他终端状态,您可以使用 EksNodegroupStateSensor
。
tests/system/amazon/aws/example_eks_with_nodegroups.py
await_create_nodegroup = EksNodegroupStateSensor(
task_id="await_create_nodegroup",
cluster_name=cluster_name,
nodegroup_name=nodegroup_name,
target_state=NodegroupStates.ACTIVE,
)
等待 AWS Fargate 配置文件状态¶
要检查 AWS Fargate 配置文件的状态,直到其达到目标状态或其他终端状态,您可以使用 EksFargateProfileSensor
。
tests/system/amazon/aws/example_eks_with_fargate_profile.py
await_create_fargate_profile = EksFargateProfileStateSensor(
task_id="wait_for_create_fargate_profile",
cluster_name=cluster_name,
fargate_profile_name=fargate_profile_name,
target_state=FargateProfileStates.ACTIVE,
)