Amazon Elastic Kubernetes Service (EKS)

Amazon Elastic Kubernetes Service (Amazon EKS) 是一项托管服务,可让您轻松在 AWS 上运行 Kubernetes,而无需自行建立或维护 Kubernetes 控制平面。Kubernetes 是一个开源系统,用于自动化容器化应用的部署、扩缩和管理。

Airflow 提供了用于创建 EKS 集群和计算基础设施以及与其交互的 Operator。

前置任务

要使用这些 Operator,您必须执行以下几项操作

通用参数

aws_conn_id

引用 Amazon Web Services 连接 ID。如果此参数设置为 None,则使用默认的 boto3 行为,无需查找连接。否则,使用 Connection 中存储的凭据。默认值: aws_default

region_name

AWS 区域名称。如果此参数设置为 None 或省略,则使用 AWS 连接额外参数 中的 region_name。否则,使用指定的值代替连接中的值。默认值: None

verify

是否验证 SSL 证书。

  • False - 不验证 SSL 证书。

  • path/to/cert/bundle.pem - 要使用的 CA 证书捆绑包的文件名。如果您想使用与 botocore 使用的 CA 证书捆绑包不同的捆绑包,可以指定此参数。

如果此参数设置为 None 或省略,则使用 AWS 连接额外参数 中的 verify。否则,使用指定的值代替连接中的值。默认值: None

botocore_config

提供的字典用于构造一个 botocore.config.Config。此配置可用于配置 避免限流异常、超时等。

示例,有关参数的更多详细信息,请参阅 botocore.config.Config
{
    "signature_version": "unsigned",
    "s3": {
        "us_east_1_regional_endpoint": True,
    },
    "retries": {
      "mode": "standard",
      "max_attempts": 10,
    },
    "connect_timeout": 300,
    "read_timeout": 300,
    "tcp_keepalive": True,
}

如果此参数设置为 None 或省略,则使用 AWS 连接额外参数 中的 config_kwargs。否则,使用指定的值代替连接中的值。默认值: None

注意

指定一个空字典,{},将覆盖 botocore.config.Config 的连接配置

Operator

创建 Amazon EKS 集群

要创建 Amazon EKS 集群,您可以使用 EksCreateClusterOperator

注意:需要具有以下权限的 AWS IAM 角色

eks.amazonaws.com 必须添加到信任关系中 AmazonEKSClusterPolicy IAM Policy 必须附加

tests/system/amazon/aws/example_eks_with_nodegroups.py

# Create an Amazon EKS Cluster control plane without attaching compute service.
create_cluster = EksCreateClusterOperator(
    task_id="create_cluster",
    cluster_name=cluster_name,
    cluster_role_arn=test_context[ROLE_ARN_KEY],
    resources_vpc_config={"subnetIds": test_context[SUBNETS_KEY]},
    compute=None,
)

一步创建 Amazon EKS 集群和节点组

要一步创建 Amazon EKS 集群和 EKS 托管节点组,您可以使用 EksCreateClusterOperator

注意:需要具有以下权限的 AWS IAM 角色

ec2.amazon.aws.com 必须在信任关系中 eks.amazonaws.com 必须添加到信任关系中 AmazonEC2ContainerRegistryReadOnly IAM Policy 必须附加 AmazonEKSClusterPolicy IAM Policy 必须附加 AmazonEKSWorkerNodePolicy IAM Policy 必须附加

tests/system/amazon/aws/example_eks_with_nodegroup_in_one_step.py

# Create an Amazon EKS cluster control plane and an EKS nodegroup compute platform in one step.
create_cluster_and_nodegroup = EksCreateClusterOperator(
    task_id="create_cluster_and_nodegroup",
    cluster_name=cluster_name,
    nodegroup_name=nodegroup_name,
    cluster_role_arn=test_context[ROLE_ARN_KEY],
    # Opting to use the same ARN for the cluster and the nodegroup here,
    # but a different ARN could be configured and passed if desired.
    nodegroup_role_arn=test_context[ROLE_ARN_KEY],
    resources_vpc_config={"subnetIds": test_context[SUBNETS_KEY]},
    # ``compute='nodegroup'`` is the default, explicitly set here for demo purposes.
    compute="nodegroup",
    # The launch template enforces IMDSv2 and is required for internal
    # compliance when running these system tests on AWS infrastructure.
    create_nodegroup_kwargs={"launchTemplate": {"name": launch_template_name}},
)

一步创建 Amazon EKS 集群和 AWS Fargate 配置文件

要一步创建 Amazon EKS 集群和 AWS Fargate 配置文件,您可以使用 EksCreateClusterOperator。您还可以通过将 deferrable 参数设置为 True,在可延迟模式下运行此 Operator。

注意:需要具有以下权限的 AWS IAM 角色

ec2.amazon.aws.com 必须在信任关系中 eks.amazonaws.com 必须添加到信任关系中 AmazonEC2ContainerRegistryReadOnly IAM Policy 必须附加 AmazonEKSClusterPolicy IAM Policy 必须附加 AmazonEKSWorkerNodePolicy IAM Policy 必须附加

tests/system/amazon/aws/example_eks_with_fargate_in_one_step.py

# Create an Amazon EKS cluster control plane and an AWS Fargate compute platform in one step.
create_cluster_and_fargate_profile = EksCreateClusterOperator(
    task_id="create_eks_cluster_and_fargate_profile",
    cluster_name=cluster_name,
    cluster_role_arn=cluster_role_arn,
    resources_vpc_config={
        "subnetIds": subnets,
        "endpointPublicAccess": True,
        "endpointPrivateAccess": False,
    },
    compute="fargate",
    fargate_profile_name=fargate_profile_name,
    # Opting to use the same ARN for the cluster and the pod here,
    # but a different ARN could be configured and passed if desired.
    fargate_pod_execution_role_arn=fargate_pod_role_arn,
)

删除 Amazon EKS 集群

要删除现有的 Amazon EKS 集群,您可以使用 EksDeleteClusterOperator。您还可以通过将 deferrable 参数设置为 True,在可延迟模式下运行此 Operator。

tests/system/amazon/aws/example_eks_with_nodegroups.py

delete_cluster = EksDeleteClusterOperator(
    task_id="delete_cluster",
    cluster_name=cluster_name,
)
注意:如果集群附加了任何资源,例如 Amazon EKS 节点组或 AWS

Fargate 配置文件,则无法删除集群。使用 force 参数将首先尝试删除任何附加的资源。

tests/system/amazon/aws/example_eks_with_nodegroup_in_one_step.py

# An Amazon EKS cluster can not be deleted with attached resources such as nodegroups or Fargate profiles.
# Setting the `force` to `True` will delete any attached resources before deleting the cluster.
delete_nodegroup_and_cluster = EksDeleteClusterOperator(
    task_id="delete_nodegroup_and_cluster",
    cluster_name=cluster_name,
    force_delete_compute=True,
)

创建 Amazon EKS 托管节点组

要创建 Amazon EKS 托管节点组,您可以使用 EksCreateNodegroupOperator。您还可以通过将 deferrable 参数设置为 True,在可延迟模式下运行此 Operator。

注意:需要具有以下权限的 AWS IAM 角色

ec2.amazon.aws.com 必须在信任关系中 AmazonEC2ContainerRegistryReadOnly IAM Policy 必须附加 AmazonEKSWorkerNodePolicy IAM Policy 必须附加

tests/system/amazon/aws/example_eks_with_nodegroups.py

create_nodegroup = EksCreateNodegroupOperator(
    task_id="create_nodegroup",
    cluster_name=cluster_name,
    nodegroup_name=nodegroup_name,
    nodegroup_subnets=test_context[SUBNETS_KEY],
    nodegroup_role_arn=test_context[ROLE_ARN_KEY],
)

删除 Amazon EKS 托管节点组

要删除现有的 Amazon EKS 托管节点组,您可以使用 EksDeleteNodegroupOperator。您还可以通过将 deferrable 参数设置为 True,在可延迟模式下运行此 Operator。

tests/system/amazon/aws/example_eks_with_nodegroups.py

delete_nodegroup = EksDeleteNodegroupOperator(
    task_id="delete_nodegroup",
    cluster_name=cluster_name,
    nodegroup_name=nodegroup_name,
)

创建 AWS Fargate 配置文件

要创建 AWS Fargate 配置文件,您可以使用 EksCreateFargateProfileOperator

注意:需要具有以下权限的 AWS IAM 角色

ec2.amazon.aws.com 必须在信任关系中 AmazonEC2ContainerRegistryReadOnly IAM Policy 必须附加 AmazonEKSWorkerNodePolicy IAM Policy 必须附加

tests/system/amazon/aws/example_eks_with_fargate_profile.py

create_fargate_profile = EksCreateFargateProfileOperator(
    task_id="create_eks_fargate_profile",
    cluster_name=cluster_name,
    pod_execution_role_arn=fargate_pod_role_arn,
    fargate_profile_name=fargate_profile_name,
    selectors=SELECTORS,
)

删除 AWS Fargate 配置文件

要删除现有的 AWS Fargate 配置文件,您可以使用 EksDeleteFargateProfileOperator

tests/system/amazon/aws/example_eks_with_fargate_profile.py

delete_fargate_profile = EksDeleteFargateProfileOperator(
    task_id="delete_eks_fargate_profile",
    cluster_name=cluster_name,
    fargate_profile_name=fargate_profile_name,
)

在 Amazon EKS 集群上执行任务

要在现有的 Amazon EKS 集群上运行 Pod,您可以使用 EksPodOperator

注意:需要具有底层计算基础设施的 Amazon EKS 集群。

tests/system/amazon/aws/example_eks_with_nodegroups.py

start_pod = EksPodOperator(
    task_id="start_pod",
    pod_name="test_pod",
    cluster_name=cluster_name,
    image="amazon/aws-cli:latest",
    cmds=["sh", "-c", "echo Test Airflow; date"],
    labels={"demo": "hello_world"},
    get_logs=True,
    on_finish_action="keep_pod",
)

Sensor

等待 Amazon EKS 集群状态

要检查 Amazon EKS 集群的状态,直到其达到目标状态或其他终端状态,您可以使用 EksClusterStateSensor

tests/system/amazon/aws/example_eks_with_nodegroups.py

await_create_cluster = EksClusterStateSensor(
    task_id="await_create_cluster",
    cluster_name=cluster_name,
    target_state=ClusterStates.ACTIVE,
)

等待 Amazon EKS 托管节点组状态

要检查 Amazon EKS 托管节点组的状态,直到其达到目标状态或其他终端状态,您可以使用 EksNodegroupStateSensor

tests/system/amazon/aws/example_eks_with_nodegroups.py

await_create_nodegroup = EksNodegroupStateSensor(
    task_id="await_create_nodegroup",
    cluster_name=cluster_name,
    nodegroup_name=nodegroup_name,
    target_state=NodegroupStates.ACTIVE,
)

等待 AWS Fargate 配置文件状态

要检查 AWS Fargate 配置文件的状态,直到其达到目标状态或其他终端状态,您可以使用 EksFargateProfileSensor

tests/system/amazon/aws/example_eks_with_fargate_profile.py

await_create_fargate_profile = EksFargateProfileStateSensor(
    task_id="wait_for_create_fargate_profile",
    cluster_name=cluster_name,
    fargate_profile_name=fargate_profile_name,
    target_state=FargateProfileStates.ACTIVE,
)

参考

此条目是否有帮助?