Google Cloud Run 操作符

Cloud Run 用于在完全托管的平台上构建和部署用任何语言(包括 Go、Python、Java、Node.js、.NET 和 Ruby)编写的可扩展容器化应用程序。

有关此服务的更多信息,请访问 Google Cloud Run 文档

先决条件任务

要使用这些操作符,您必须执行以下操作:

创建作业

在 Cloud Run 中创建作业之前,您需要定义它。有关 Job 对象字段的更多信息,请访问 Google Cloud Run Job 描述

可以使用 Job 对象创建简单的作业配置

tests/system/google/cloud/cloud_run/example_cloud_run.py[源代码]

def _create_job_instance() -> Job:
    """
    Create a Cloud Run job configuration with google.cloud.run_v2.Job object.

    As a minimum the configuration must contain a container image name in its template.
    The rest of the configuration parameters are optional and will be populated with default values if not set.
    """
    job = Job()
    container = k8s_min.Container()
    container.image = "us-docker.pkg.dev/cloudrun/container/job:latest"
    container.resources.limits = {"cpu": "2", "memory": "1Gi"}
    job.template.template.containers.append(container)
    return job


或使用 Python 字典

tests/system/google/cloud/cloud_run/example_cloud_run.py[源代码]

def _create_job_dict() -> dict:
    """
    Create a Cloud Run job configuration with a Python dict.

    As a minimum the configuration must contain a container image name in its template.
    """
    return {
        "template": {
            "template": {
                "containers": [
                    {
                        "image": "us-docker.pkg.dev/cloudrun/container/job:latest",
                        "resources": {
                            "limits": {"cpu": "1", "memory": "512Mi"},
                            "cpu_idle": False,
                            "startup_cpu_boost": False,
                        },
                        "name": "",
                        "command": [],
                        "args": [],
                        "env": [],
                        "ports": [],
                        "volume_mounts": [],
                        "working_dir": "",
                        "depends_on": [],
                    }
                ],
                "volumes": [],
                "execution_environment": 0,
                "encryption_key": "",
            },
            "labels": {},
            "annotations": {},
            "parallelism": 0,
            "task_count": 0,
        },
        "name": "",
        "uid": "",
        "generation": "0",
        "labels": {},
        "annotations": {},
        "creator": "",
        "last_modifier": "",
        "client": "",
        "client_version": "",
        "launch_stage": 0,
        "observed_generation": "0",
        "conditions": [],
        "execution_count": 0,
        "reconciling": False,
        "satisfies_pzs": False,
        "etag": "",
    }


您可以使用以下任何一种配置创建 Cloud Run 作业:CloudRunCreateJobOperator

tests/system/google/cloud/cloud_run/example_cloud_run.py[源代码]

create1 = CloudRunCreateJobOperator(
    task_id=create1_task_name,
    project_id=PROJECT_ID,
    region=region,
    job_name=job1_name,
    job=_create_job_instance(),
    dag=dag,
)

请注意,此操作符仅创建作业而不执行它。作业的字典表示形式被推送到 XCom。

创建服务

在 Cloud Run 中创建服务之前,您需要定义它。有关 Service 对象字段的更多信息,请访问 Google Cloud Run Service 描述

一个简单的服务配置如下所示

tests/system/google/cloud/cloud_run/example_cloud_run_service.py[源代码]

def _create_service():
    service = Service()
    container = k8s_min.Container()
    container.image = "us-docker.pkg.dev/cloudrun/container/placeholder:latest"
    service.template.containers.append(container)
    return service


使用此配置,我们可以创建服务:CloudRunCreateServiceOperator

tests/system/google/cloud/cloud_run/example_cloud_run_service.py[源代码]

create_cloud_run_service = CloudRunCreateServiceOperator(
    task_id="create-cloud-run-service",
    project_id=PROJECT_ID,
    region="us-central1",
    service=_create_service(),
    service_name="cloudrun-system-test-service",
)

请注意,此操作符仅创建服务而不执行它。服务的字典表示形式被推送到 XCom。

删除服务

使用此配置,我们可以删除服务:CloudRunDeleteServiceOperator

tests/system/google/cloud/cloud_run/example_cloud_run_service.py[源代码]

delete_cloud_run_service = CloudRunDeleteServiceOperator(
    task_id="delete-cloud-run-service",
    project_id=PROJECT_ID,
    region="us-central1",
    service_name="cloudrun-system-test-service",
    dag=dag,
)

请注意,此操作符会等待服务被删除,并且已删除的服务的字典表示形式会被推送到 XCom。

执行作业

要执行作业,可以使用

CloudRunExecuteJobOperator

tests/system/google/cloud/cloud_run/example_cloud_run.py[源代码]

execute1 = CloudRunExecuteJobOperator(
    task_id=execute1_task_name,
    project_id=PROJECT_ID,
    region=region,
    job_name=job1_name,
    dag=dag,
    deferrable=False,
)

或者,您可以在可延迟模式下定义相同的操作符

CloudRunExecuteJobOperator

tests/system/google/cloud/cloud_run/example_cloud_run.py[源代码]

execute2 = CloudRunExecuteJobOperator(
    task_id=execute2_task_name,
    project_id=PROJECT_ID,
    region=region,
    job_name=job2_name,
    dag=dag,
    deferrable=True,
)

您还可以指定覆盖,以便为作业提供新的入口点命令等

CloudRunExecuteJobOperator

tests/system/google/cloud/cloud_run/example_cloud_run.py[源代码]

overrides = {
    "container_overrides": [
        {
            "name": "job",
            "args": ["python", "main.py"],
            "env": [{"name": "ENV_VAR", "value": "value"}],
            "clear_args": False,
        }
    ],
    "task_count": 1,
    "timeout": "60s",
}

execute3 = CloudRunExecuteJobOperator(
    task_id=execute3_task_name,
    project_id=PROJECT_ID,
    region=region,
    overrides=overrides,
    job_name=job3_name,
    dag=dag,
    deferrable=False,
)

更新作业

要更新作业,可以使用

CloudRunUpdateJobOperator

tests/system/google/cloud/cloud_run/example_cloud_run.py[源代码]

update_job1 = CloudRunUpdateJobOperator(
    task_id=update_job1_task_name,
    project_id=PROJECT_ID,
    region=region,
    job_name=job1_name,
    job=_create_job_instance_with_label(),
    dag=dag,
)

作业的字典表示形式被推送到 XCom。

列出作业

要列出作业,可以使用

CloudRunListJobsOperator

tests/system/google/cloud/cloud_run/example_cloud_run.py[源代码]

list_jobs = CloudRunListJobsOperator(
    task_id=list_jobs_task_name, project_id=PROJECT_ID, region=region, dag=dag
)

该操作符接受两个可选参数:“limit” 用于限制返回的任务数,“show_deleted” 用于在结果中包含已删除的作业。

删除作业

要删除作业,可以使用

CloudRunDeleteJobOperator

tests/system/google/cloud/cloud_run/example_cloud_run.py[源代码]

delete_job1 = CloudRunDeleteJobOperator(
    task_id="delete-job1",
    project_id=PROJECT_ID,
    region=region,
    job_name=job1_name,
    dag=dag,
    trigger_rule=TriggerRule.ALL_DONE,
)

请注意,此操作符会等待作业被删除,并且已删除的作业的字典表示形式会被推送到 XCom。

此条目是否有帮助?