Google Cloud Composer Operator

Cloud Composer 是一项完全托管的工作流编排服务,使您能够创建、调度、监控和管理跨云及本地数据中心的工作流。

Cloud Composer 构建于流行的 Apache Airflow 开源项目之上,并使用 Python 编程语言运行。

通过使用 Cloud Composer 而非本地 Apache Airflow 实例,您无需安装或管理开销即可获得 Airflow 的最佳体验。Cloud Composer 可帮助您快速创建 Airflow 环境并使用 Airflow 原生工具,例如强大的 Airflow Web 界面和命令行工具,因此您可以专注于您的工作流而不是您的基础设施。

有关该服务的更多信息,请访问 Cloud Composer 产品文档 <产品文档

创建环境

在创建 Cloud Composer 环境之前,您需要对其进行定义。有关创建环境时可传递的可用字段的更多信息,请访问 Cloud Composer 创建环境 API。

一个简单的环境配置示例如下

tests/system/google/cloud/composer/example_cloud_composer.py


ENVIRONMENT_ID = f"test-{DAG_ID}-{ENV_ID}".replace("_", "-")
ENVIRONMENT_ID_ASYNC = f"test-deferrable-{DAG_ID}-{ENV_ID}".replace("_", "-")

ENVIRONMENT = {
    "config": {
        "software_config": {"image_version": "composer-2-airflow-2"},
    }
}

使用此配置我们可以创建环境: CloudComposerCreateEnvironmentOperator

tests/system/google/cloud/composer/example_cloud_composer.py

create_env = CloudComposerCreateEnvironmentOperator(
    task_id="create_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID,
    environment=ENVIRONMENT,
)

或者您可以在可推迟模式下定义相同的 operator: CloudComposerCreateEnvironmentOperator

tests/system/google/cloud/composer/example_cloud_composer.py

defer_create_env = CloudComposerCreateEnvironmentOperator(
    task_id="defer_create_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID_ASYNC,
    environment=ENVIRONMENT,
    deferrable=True,
)

获取环境

要获取环境,您可以使用

CloudComposerGetEnvironmentOperator

tests/system/google/cloud/composer/example_cloud_composer.py

get_env = CloudComposerGetEnvironmentOperator(
    task_id="get_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID,
)

列出环境

要获取环境,您可以使用

CloudComposerListEnvironmentsOperator

tests/system/google/cloud/composer/example_cloud_composer.py

list_envs = CloudComposerListEnvironmentsOperator(
    task_id="list_envs", project_id=PROJECT_ID, region=REGION
)

更新环境

您可以通过提供环境配置和 updateMask 来更新环境。在 updateMask 参数中,您指定相对于环境的字段路径进行更新。有关 updateMask 和其他参数的更多信息,请参阅 Cloud Composer 更新环境 API。

新的环境配置和 updateMask 的示例

tests/system/google/cloud/composer/example_cloud_composer.py

UPDATED_ENVIRONMENT = {
    "labels": {
        "label": "testing",
    }
}
UPDATE_MASK = {"paths": ["labels.label"]}

要更新环境,您可以使用: CloudComposerUpdateEnvironmentOperator

tests/system/google/cloud/composer/example_cloud_composer.py

update_env = CloudComposerUpdateEnvironmentOperator(
    task_id="update_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID,
    update_mask=UPDATE_MASK,
    environment=UPDATED_ENVIRONMENT,
)

或者您可以在可推迟模式下定义相同的 operator: CloudComposerCreateEnvironmentOperator

tests/system/google/cloud/composer/example_cloud_composer.py

defer_update_env = CloudComposerUpdateEnvironmentOperator(
    task_id="defer_update_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID_ASYNC,
    update_mask=UPDATE_MASK,
    environment=UPDATED_ENVIRONMENT,
    deferrable=True,
)

删除环境

要删除环境,您可以使用

CloudComposerDeleteEnvironmentOperator

tests/system/google/cloud/composer/example_cloud_composer.py

delete_env = CloudComposerDeleteEnvironmentOperator(
    task_id="delete_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID,
)

或者您可以在可推迟模式下定义相同的 operator: CloudComposerDeleteEnvironmentOperator

tests/system/google/cloud/composer/example_cloud_composer.py

defer_delete_env = CloudComposerDeleteEnvironmentOperator(
    task_id="defer_delete_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID_ASYNC,
    deferrable=True,
)

Composer 镜像列表

您还可以列出所有支持的 Cloud Composer 镜像

CloudComposerListImageVersionsOperator

tests/system/google/cloud/composer/example_cloud_composer.py

image_versions = CloudComposerListImageVersionsOperator(
    task_id="image_versions",
    project_id=PROJECT_ID,
    region=REGION,
)

运行 Airflow CLI 命令

您可以在您的环境中运行 Airflow CLI 命令,使用: CloudComposerRunAirflowCLICommandOperator

tests/system/google/cloud/composer/example_cloud_composer.py

run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
    task_id="run_airflow_cli_cmd",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID,
    command=COMMAND,
)

或者您可以在可推迟模式下定义相同的 operator

tests/system/google/cloud/composer/example_cloud_composer.py

defer_run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
    task_id="defer_run_airflow_cli_cmd",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID_ASYNC,
    command=COMMAND,
    deferrable=True,
)

检查 DAG 运行是否完成

您可以使用 sensor 检查 DAG 运行在您的环境中是否完成,使用: CloudComposerDAGRunSensor

tests/system/google/cloud/composer/example_cloud_composer.py

dag_run_sensor = CloudComposerDAGRunSensor(
    task_id="dag_run_sensor",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID,
    composer_dag_id="airflow_monitoring",
    allowed_states=["success"],
)

或者您可以在可推迟模式下定义相同的 sensor

tests/system/google/cloud/composer/example_cloud_composer.py

defer_dag_run_sensor = CloudComposerDAGRunSensor(
    task_id="defer_dag_run_sensor",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID_ASYNC,
    composer_dag_id="airflow_monitoring",
    allowed_states=["success"],
    deferrable=True,
)

此条目有帮助吗?