AWS Batch

AWS Batch 使您能够在 AWS 云上运行批量计算工作负载。批量计算是开发人员、科学家和工程师访问大量计算资源的常用方式。AWS Batch 消除了配置和管理所需基础设施的繁重工作。

前提条件任务

要使用这些运算符,您需要做几件事:

通用参数

aws_conn_id

Amazon Web Services 连接 ID 的引用。如果此参数设置为 None,则使用默认的 boto3 行为,而不进行连接查找。否则使用连接中存储的凭据。默认值:aws_default

region_name

AWS 区域名称。如果此参数设置为 None 或省略,则将使用 AWS Connection Extra Parameter 中的 region_name。否则使用指定的值而不是连接值。默认值:None

verify

是否验证 SSL 证书。

  • False - 不验证 SSL 证书。

  • path/to/cert/bundle.pem - 要使用的 CA 证书包的文件名。如果您想使用与 botocore 使用的不同的 CA 证书包,可以指定此参数。

如果此参数设置为 None 或省略,则将使用 AWS Connection Extra Parameter 中的 verify。否则使用指定的值而不是连接值。默认值:None

botocore_config

提供的字典用于构造一个 botocore.config.Config。此配置可用于配置 避免限制异常、超时等。

示例,有关参数的更多详细信息,请参阅 botocore.config.Config
{
    "signature_version": "unsigned",
    "s3": {
        "us_east_1_regional_endpoint": True,
    },
    "retries": {
      "mode": "standard",
      "max_attempts": 10,
    },
    "connect_timeout": 300,
    "read_timeout": 300,
    "tcp_keepalive": True,
}

如果此参数设置为 None 或省略,则将使用 AWS Connection Extra Parameter 中的 config_kwargs。否则使用指定的值而不是连接值。默认值:None

注意

指定一个空字典 {} 将覆盖 botocore.config.Config 的连接配置。

运算符

提交新的 AWS Batch 作业

要提交新的 AWS Batch 作业并监控其直到达到终止状态,您可以使用 BatchOperator

tests/system/amazon/aws/example_batch.py

submit_batch_job = BatchOperator(
    task_id="submit_batch_job",
    job_name=batch_job_name,
    job_queue=batch_job_queue_name,
    job_definition=batch_job_definition_name,
    container_overrides=JOB_OVERRIDES,
)

创建 AWS Batch 计算环境

要创建新的 AWS Batch 计算环境,您可以使用 BatchCreateComputeEnvironmentOperator

tests/system/amazon/aws/example_batch.py

create_compute_environment = BatchCreateComputeEnvironmentOperator(
    task_id="create_compute_environment",
    compute_environment_name=batch_job_compute_environment_name,
    environment_type="MANAGED",
    state="ENABLED",
    compute_resources={
        "type": "FARGATE",
        "maxvCpus": 10,
        "securityGroupIds": security_groups,
        "subnets": subnets,
    },
)

传感器

等待 AWS Batch 作业状态

要等待 AWS Batch 作业状态直到达到终止状态,您可以使用 BatchSensor

tests/system/amazon/aws/example_batch.py

wait_for_batch_job = BatchSensor(
    task_id="wait_for_batch_job",
    job_id=submit_batch_job.output,
)

为了异步监控 AWS Batch 作业的状态,请使用 BatchSensor,并将参数 deferrable 设置为 True。

由于这将释放 Airflow worker 插槽,因此可以有效利用 Airflow 部署中的可用资源。这还需要在您的 Airflow 部署中提供触发器组件。

等待 AWS Batch 计算环境状态

要等待 AWS Batch 计算环境状态直到达到终止状态,您可以使用 BatchComputeEnvironmentSensor

tests/system/amazon/aws/example_batch.py

wait_for_compute_environment_valid = BatchComputeEnvironmentSensor(
    task_id="wait_for_compute_environment_valid",
    compute_environment=batch_job_compute_environment_name,
)

等待 AWS Batch 作业队列状态

要等待 AWS Batch 作业队列状态直到达到终止状态,您可以使用 BatchJobQueueSensor

tests/system/amazon/aws/example_batch.py

wait_for_job_queue_valid = BatchJobQueueSensor(
    task_id="wait_for_job_queue_valid",
    job_queue=batch_job_queue_name,
)

参考资料

此条目是否有用?