AWS Batch

AWS Batch 使您能够在 AWS 云上运行批处理计算工作负载。批处理计算是开发人员、科学家和工程师访问大量计算资源的常用方法。AWS Batch 消除了配置和管理所需基础设施的无差别繁重工作。

先决条件任务

要使用这些操作器,您需要执行以下操作

操作器

提交新的 AWS Batch 作业

要提交新的 AWS Batch 作业并监控它直到达到最终状态,您可以使用 BatchOperator

tests/system/providers/amazon/aws/example_batch.py[源代码]

submit_batch_job = BatchOperator(
    task_id="submit_batch_job",
    job_name=batch_job_name,
    job_queue=batch_job_queue_name,
    job_definition=batch_job_definition_name,
    overrides=JOB_OVERRIDES,
)

创建 AWS Batch 计算环境

要创建新的 AWS Batch 计算环境,您可以使用 BatchCreateComputeEnvironmentOperator

tests/system/providers/amazon/aws/example_batch.py[源代码]

create_compute_environment = BatchCreateComputeEnvironmentOperator(
    task_id="create_compute_environment",
    compute_environment_name=batch_job_compute_environment_name,
    environment_type="MANAGED",
    state="ENABLED",
    compute_resources={
        "type": "FARGATE",
        "maxvCpus": 10,
        "securityGroupIds": security_groups,
        "subnets": subnets,
    },
)

传感器

等待 AWS Batch 作业状态

要等待 AWS Batch 作业的状态,直到它达到最终状态,您可以使用 BatchSensor

tests/system/providers/amazon/aws/example_batch.py[源代码]

wait_for_batch_job = BatchSensor(
    task_id="wait_for_batch_job",
    job_id=submit_batch_job.output,
)

为了异步监控 AWS Batch 作业的状态,请使用 BatchSensor 并将参数 deferrable 设置为 True。

由于这将释放 Airflow 工作器插槽,因此将有效利用 Airflow 部署上的可用资源。这也需要在您的 Airflow 部署中提供触发器组件。

等待 AWS Batch 计算环境状态

要等待 AWS Batch 计算环境的状态,直到它达到最终状态,您可以使用 BatchComputeEnvironmentSensor

tests/system/providers/amazon/aws/example_batch.py[源代码]

wait_for_compute_environment_valid = BatchComputeEnvironmentSensor(
    task_id="wait_for_compute_environment_valid",
    compute_environment=batch_job_compute_environment_name,
)

等待 AWS Batch 作业队列状态

要等待 AWS Batch 作业队列的状态,直到它达到最终状态,您可以使用 BatchJobQueueSensor

tests/system/providers/amazon/aws/example_batch.py[源代码]

wait_for_job_queue_valid = BatchJobQueueSensor(
    task_id="wait_for_job_queue_valid",
    job_queue=batch_job_queue_name,
)

此条目有帮助吗?