Google Cloud Composer 操作符

Cloud Composer 是一种完全托管的工作流编排服务,使你能够创建、调度、监控和管理跨云和本地数据中心的工作流。

Cloud Composer 基于流行的 Apache Airflow 开源项目构建,并使用 Python 编程语言进行操作。

通过使用 Cloud Composer 而不是本地 Apache Airflow 实例,你可以从 Airflow 的最佳功能中受益,而无需安装或管理开销。Cloud Composer 可帮助你快速创建 Airflow 环境并使用 Airflow 原生工具,例如强大的 Airflow Web 界面和命令行工具,以便你可以专注于你的工作流,而不是你的基础设施。

有关该服务的更多信息,请访问 Cloud Composer 产品文档<产品文档

创建环境

在创建 Cloud Composer 环境之前,你需要定义它。有关创建环境时可传递的可用字段的更多信息,请访问 Cloud Composer 创建环境 API。

一个简单的环境配置如下所示

tests/system/google/cloud/composer/example_cloud_composer.py[源代码]


ENVIRONMENT_ID = f"test-{DAG_ID}-{ENV_ID}".replace("_", "-")
ENVIRONMENT_ID_ASYNC = f"test-deferrable-{DAG_ID}-{ENV_ID}".replace("_", "-")

ENVIRONMENT = {
    "config": {
        "software_config": {"image_version": "composer-2-airflow-2"},
    }
}

通过此配置,我们可以创建环境: CloudComposerCreateEnvironmentOperator

tests/system/google/cloud/composer/example_cloud_composer.py[源代码]

create_env = CloudComposerCreateEnvironmentOperator(
    task_id="create_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID,
    environment=ENVIRONMENT,
)

或者你可以在可延迟模式下定义相同的操作符: CloudComposerCreateEnvironmentOperator

tests/system/google/cloud/composer/example_cloud_composer.py[源代码]

defer_create_env = CloudComposerCreateEnvironmentOperator(
    task_id="defer_create_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID_ASYNC,
    environment=ENVIRONMENT,
    deferrable=True,
)

获取环境

要获取环境,你可以使用

CloudComposerGetEnvironmentOperator

tests/system/google/cloud/composer/example_cloud_composer.py[源代码]

get_env = CloudComposerGetEnvironmentOperator(
    task_id="get_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID,
)

列出环境

要获取环境,你可以使用

CloudComposerListEnvironmentsOperator

tests/system/google/cloud/composer/example_cloud_composer.py[源代码]

list_envs = CloudComposerListEnvironmentsOperator(
    task_id="list_envs", project_id=PROJECT_ID, region=REGION
)

更新环境

你可以通过提供环境配置和 updateMask 来更新环境。在 updateMask 参数中,你需要指定要更新的字段的路径,该路径相对于 Environment。有关 updateMask 和其他参数的更多信息,请查看 Cloud Composer 更新环境 API。

新的服务配置和 updateMask 的示例

tests/system/google/cloud/composer/example_cloud_composer.py[源代码]

UPDATED_ENVIRONMENT = {
    "labels": {
        "label": "testing",
    }
}
UPDATE_MASK = {"paths": ["labels.label"]}

要更新服务,你可以使用: CloudComposerUpdateEnvironmentOperator

tests/system/google/cloud/composer/example_cloud_composer.py[源代码]

update_env = CloudComposerUpdateEnvironmentOperator(
    task_id="update_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID,
    update_mask=UPDATE_MASK,
    environment=UPDATED_ENVIRONMENT,
)

或者你可以在可延迟模式下定义相同的操作符: CloudComposerCreateEnvironmentOperator

tests/system/google/cloud/composer/example_cloud_composer.py[源代码]

defer_update_env = CloudComposerUpdateEnvironmentOperator(
    task_id="defer_update_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID_ASYNC,
    update_mask=UPDATE_MASK,
    environment=UPDATED_ENVIRONMENT,
    deferrable=True,
)

删除服务

要删除服务,你可以使用

CloudComposerDeleteEnvironmentOperator

tests/system/google/cloud/composer/example_cloud_composer.py[源代码]

delete_env = CloudComposerDeleteEnvironmentOperator(
    task_id="delete_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID,
)

或者你可以在可延迟模式下定义相同的操作符: CloudComposerDeleteEnvironmentOperator

tests/system/google/cloud/composer/example_cloud_composer.py[源代码]

defer_delete_env = CloudComposerDeleteEnvironmentOperator(
    task_id="defer_delete_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID_ASYNC,
    deferrable=True,
)

Composer 镜像列表

你还可以列出所有受支持的 Cloud Composer 镜像

CloudComposerListImageVersionsOperator

tests/system/google/cloud/composer/example_cloud_composer.py[源代码]

image_versions = CloudComposerListImageVersionsOperator(
    task_id="image_versions",
    project_id=PROJECT_ID,
    region=REGION,
)

运行 Airflow CLI 命令

你可以在你的环境中运行 Airflow CLI 命令,使用: CloudComposerRunAirflowCLICommandOperator

tests/system/google/cloud/composer/example_cloud_composer.py[源代码]

run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
    task_id="run_airflow_cli_cmd",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID,
    command=COMMAND,
)

或者你可以在可延迟模式下定义相同的操作符

tests/system/google/cloud/composer/example_cloud_composer.py[源代码]

defer_run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
    task_id="defer_run_airflow_cli_cmd",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID_ASYNC,
    command=COMMAND,
    deferrable=True,
)

检查 DAG 运行是否已完成

你可以使用传感器来检查 DAG 运行是否已在你的环境中完成,使用: CloudComposerDAGRunSensor

tests/system/google/cloud/composer/example_cloud_composer.py[源代码]

dag_run_sensor = CloudComposerDAGRunSensor(
    task_id="dag_run_sensor",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID,
    composer_dag_id="airflow_monitoring",
    allowed_states=["success"],
)

或者你可以在可延迟模式下定义相同的传感器

tests/system/google/cloud/composer/example_cloud_composer.py[源代码]

defer_dag_run_sensor = CloudComposerDAGRunSensor(
    task_id="defer_dag_run_sensor",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID_ASYNC,
    composer_dag_id="airflow_monitoring",
    allowed_states=["success"],
    deferrable=True,
)

此条目是否有帮助?