Google Cloud Batch 操作符¶
Cloud Batch 是一种完全托管的批处理服务,用于在 Google 的基础设施上调度、排队和执行批处理作业。
有关该服务的更多信息,请访问 Google Cloud Batch 文档。
先决条件任务¶
要使用这些操作符,您必须执行以下几项操作
使用 Cloud Console 选择或创建 Cloud Platform 项目。
按照 Google Cloud 文档 中的说明,启用您的项目的结算功能。
按照 Cloud Console 文档 中的说明,启用 API。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'有关详细信息,请参阅 安装。
提交作业¶
在 Cloud Batch 中提交作业之前,您需要定义它。有关 Job 对象字段的更多信息,请访问 Google Cloud Batch Job 描述。
一个简单的作业配置如下所示
def _create_job():
runnable = batch_v1.Runnable()
runnable.container = batch_v1.Runnable.Container()
runnable.container.image_uri = "gcr.io/google-containers/busybox"
runnable.container.entrypoint = "/bin/sh"
runnable.container.commands = [
"-c",
"echo Hello world! This is task ${BATCH_TASK_INDEX}.\
This job has a total of ${BATCH_TASK_COUNT} tasks.",
]
task = batch_v1.TaskSpec()
task.runnables = [runnable]
resources = batch_v1.ComputeResource()
resources.cpu_milli = 2000
resources.memory_mib = 16
task.compute_resource = resources
task.max_retry_count = 2
group = batch_v1.TaskGroup()
group.task_count = 2
group.task_spec = task
policy = batch_v1.AllocationPolicy.InstancePolicy()
policy.machine_type = "e2-standard-4"
instances = batch_v1.AllocationPolicy.InstancePolicyOrTemplate()
instances.policy = policy
allocation_policy = batch_v1.AllocationPolicy()
allocation_policy.instances = [instances]
job = batch_v1.Job()
job.task_groups = [group]
job.allocation_policy = allocation_policy
job.labels = {"env": "testing", "type": "container"}
job.logs_policy = batch_v1.LogsPolicy()
job.logs_policy.destination = batch_v1.LogsPolicy.Destination.CLOUD_LOGGING
return job
使用此配置,我们可以提交作业:CloudBatchSubmitJobOperator
submit1 = CloudBatchSubmitJobOperator(
task_id="submit-job1",
project_id=PROJECT_ID,
region=REGION,
job_name=job1_name,
job=_create_job(),
dag=dag,
deferrable=False,
)
或者您可以在可延迟模式下定义相同的操作符:CloudBatchSubmitJobOperator
submit2 = CloudBatchSubmitJobOperator(
task_id="submit-job2",
project_id=PROJECT_ID,
region=REGION,
job_name=job2_name,
job=batch_v1.Job.to_dict(_create_job()),
dag=dag,
deferrable=True,
)
请注意,此操作符会等待作业完成执行,并且作业的字典表示形式会被推送到 XCom。
列出作业的任务¶
要列出某个作业的任务,您可以使用
list_tasks = CloudBatchListTasksOperator(
task_id=list_tasks_task_name, project_id=PROJECT_ID, region=REGION, job_name=job1_name, dag=dag
)
该操作符接受两个可选参数:“limit”用于限制返回的任务数量,“filter”用于仅列出与筛选器匹配的任务。
列出作业¶
要列出作业,您可以使用
list_jobs = CloudBatchListJobsOperator(
task_id=list_jobs_task_name,
project_id=PROJECT_ID,
region=REGION,
limit=10,
filter=f"name:projects/{PROJECT_ID}/locations/{REGION}/jobs/{job_name_prefix}*",
dag=dag,
)
该操作符接受两个可选参数:“limit”用于限制返回的任务数量,“filter”用于仅列出与筛选器匹配的任务。
删除作业¶
要删除作业,您可以使用
delete_job1 = CloudBatchDeleteJobOperator(
task_id="delete-job1",
project_id=PROJECT_ID,
region=REGION,
job_name=job1_name,
dag=dag,
trigger_rule=TriggerRule.ALL_DONE,
)
请注意,此操作符会等待作业被删除,并且已删除作业的字典表示形式会被推送到 XCom。