AWS Batch¶
AWS Batch 使您能够在 AWS 云上运行批处理计算工作负载。批处理计算是开发人员、科学家和工程师访问大量计算资源的常用方式。AWS Batch 消除了配置和管理所需基础设施的无差别繁重工作。
先决条件任务¶
要使用这些操作符,您必须执行以下几项操作
通过 pip 安装 API 库。
pip install 'apache-airflow[amazon]'详细信息请参考 Airflow® 的安装
设置连接.
操作符¶
提交新的 AWS Batch 作业¶
要提交新的 AWS Batch 作业并监控它直到达到终端状态,您可以使用 BatchOperator
。
tests/system/amazon/aws/example_batch.py
submit_batch_job = BatchOperator(
task_id="submit_batch_job",
job_name=batch_job_name,
job_queue=batch_job_queue_name,
job_definition=batch_job_definition_name,
container_overrides=JOB_OVERRIDES,
)
创建 AWS Batch 计算环境¶
要创建新的 AWS Batch 计算环境,您可以使用 BatchCreateComputeEnvironmentOperator
。
tests/system/amazon/aws/example_batch.py
create_compute_environment = BatchCreateComputeEnvironmentOperator(
task_id="create_compute_environment",
compute_environment_name=batch_job_compute_environment_name,
environment_type="MANAGED",
state="ENABLED",
compute_resources={
"type": "FARGATE",
"maxvCpus": 10,
"securityGroupIds": security_groups,
"subnets": subnets,
},
)
传感器¶
等待 AWS Batch 作业状态¶
要等待 AWS Batch 作业的状态,直到它达到终端状态,您可以使用 BatchSensor
。
tests/system/amazon/aws/example_batch.py
wait_for_batch_job = BatchSensor(
task_id="wait_for_batch_job",
job_id=submit_batch_job.output,
)
为了异步监控 AWS Batch 作业的状态,请使用参数 deferrable
设置为 True 的 BatchSensor
。
由于这将释放 Airflow 工作线程槽,因此可以有效利用 Airflow 部署上可用的资源。 这还需要在您的 Airflow 部署中提供触发器组件。
等待 AWS Batch 计算环境状态¶
要等待 AWS Batch 计算环境的状态,直到它达到终端状态,您可以使用 BatchComputeEnvironmentSensor
。
tests/system/amazon/aws/example_batch.py
wait_for_compute_environment_valid = BatchComputeEnvironmentSensor(
task_id="wait_for_compute_environment_valid",
compute_environment=batch_job_compute_environment_name,
)
等待 AWS Batch 作业队列状态¶
要等待 AWS Batch 作业队列的状态,直到它达到终端状态,您可以使用 BatchJobQueueSensor
。
tests/system/amazon/aws/example_batch.py
wait_for_job_queue_valid = BatchJobQueueSensor(
task_id="wait_for_job_queue_valid",
job_queue=batch_job_queue_name,
)