Google Cloud Batch 操作符¶
Cloud Batch 是一项完全托管的批处理服务,用于在 Google 的基础设施上调度、排队和执行批处理作业。
有关此服务的更多信息,请访问 Google Cloud Batch 文档。
前置任务¶
要使用这些操作符,您必须完成以下几项任务
使用 Cloud Console 选择或创建 Cloud Platform 项目。
为您的项目启用结算功能,如 Google Cloud 文档中所述。
启用 API,如 Cloud Console 文档中所述。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'有关详细信息,请参阅安装。
提交作业¶
在 Cloud Batch 中提交作业之前,您需要定义它。有关 Job 对象字段的更多信息,请访问 Google Cloud Batch 作业说明。
一个简单的作业配置如下所示
tests/system/google/cloud/cloud_batch/example_cloud_batch.py
def _create_job():
runnable = batch_v1.Runnable()
runnable.container = batch_v1.Runnable.Container()
runnable.container.image_uri = "gcr.io/google-containers/busybox"
runnable.container.entrypoint = "/bin/sh"
runnable.container.commands = [
"-c",
"echo Hello world! This is task ${BATCH_TASK_INDEX}.\
This job has a total of ${BATCH_TASK_COUNT} tasks.",
]
task = batch_v1.TaskSpec()
task.runnables = [runnable]
resources = batch_v1.ComputeResource()
resources.cpu_milli = 2000
resources.memory_mib = 16
task.compute_resource = resources
task.max_retry_count = 2
group = batch_v1.TaskGroup()
group.task_count = 2
group.task_spec = task
policy = batch_v1.AllocationPolicy.InstancePolicy()
policy.machine_type = "e2-standard-4"
instances = batch_v1.AllocationPolicy.InstancePolicyOrTemplate()
instances.policy = policy
allocation_policy = batch_v1.AllocationPolicy()
allocation_policy.instances = [instances]
job = batch_v1.Job()
job.task_groups = [group]
job.allocation_policy = allocation_policy
job.labels = {"env": "testing", "type": "container"}
job.logs_policy = batch_v1.LogsPolicy()
job.logs_policy.destination = batch_v1.LogsPolicy.Destination.CLOUD_LOGGING
return job
使用此配置,我们可以提交作业:CloudBatchSubmitJobOperator
tests/system/google/cloud/cloud_batch/example_cloud_batch.py
submit1 = CloudBatchSubmitJobOperator(
task_id="submit-job1",
project_id=PROJECT_ID,
region=REGION,
job_name=job1_name,
job=_create_job(),
dag=dag,
deferrable=False,
)
或者您可以在可延迟模式下定义相同的操作符:CloudBatchSubmitJobOperator
tests/system/google/cloud/cloud_batch/example_cloud_batch.py
submit2 = CloudBatchSubmitJobOperator(
task_id="submit-job2",
project_id=PROJECT_ID,
region=REGION,
job_name=job2_name,
job=batch_v1.Job.to_dict(_create_job()),
dag=dag,
deferrable=True,
)
请注意,此操作符会等待作业完成执行,并且作业的字典表示形式会推送到 XCom。
列出作业的任务¶
要列出某个作业的任务,您可以使用
tests/system/google/cloud/cloud_batch/example_cloud_batch.py
list_tasks = CloudBatchListTasksOperator(
task_id=list_tasks_task_name, project_id=PROJECT_ID, region=REGION, job_name=job1_name, dag=dag
)
此操作符接受两个可选参数:“limit”用于限制返回的任务数量,“filter”用于仅列出与过滤器匹配的任务。
列出作业¶
要列出作业,您可以使用
tests/system/google/cloud/cloud_batch/example_cloud_batch.py
list_jobs = CloudBatchListJobsOperator(
task_id=list_jobs_task_name,
project_id=PROJECT_ID,
region=REGION,
limit=10,
filter=f"name:projects/{PROJECT_ID}/locations/{REGION}/jobs/{job_name_prefix}*",
dag=dag,
)
此操作符接受两个可选参数:“limit”用于限制返回的任务数量,“filter”用于仅列出与过滤器匹配的任务。
删除作业¶
要删除作业,您可以使用
tests/system/google/cloud/cloud_batch/example_cloud_batch.py
delete_job1 = CloudBatchDeleteJobOperator(
task_id="delete-job1",
project_id=PROJECT_ID,
region=REGION,
job_name=job1_name,
dag=dag,
trigger_rule=TriggerRule.ALL_DONE,
)
请注意,此操作符会等待作业被删除,并且已删除作业的字典表示形式会推送到 XCom。