Google Cloud Composer Operator¶
Cloud Composer 是一项完全托管的工作流编排服务,使您能够创建、调度、监控和管理跨云及本地数据中心的工作流。
Cloud Composer 构建于流行的 Apache Airflow 开源项目之上,并使用 Python 编程语言运行。
通过使用 Cloud Composer 而非本地 Apache Airflow 实例,您无需安装或管理开销即可获得 Airflow 的最佳体验。Cloud Composer 可帮助您快速创建 Airflow 环境并使用 Airflow 原生工具,例如强大的 Airflow Web 界面和命令行工具,因此您可以专注于您的工作流而不是您的基础设施。
有关该服务的更多信息,请访问 Cloud Composer 产品文档 <产品文档
创建环境¶
在创建 Cloud Composer 环境之前,您需要对其进行定义。有关创建环境时可传递的可用字段的更多信息,请访问 Cloud Composer 创建环境 API。
一个简单的环境配置示例如下
tests/system/google/cloud/composer/example_cloud_composer.py
ENVIRONMENT_ID = f"test-{DAG_ID}-{ENV_ID}".replace("_", "-")
ENVIRONMENT_ID_ASYNC = f"test-deferrable-{DAG_ID}-{ENV_ID}".replace("_", "-")
ENVIRONMENT = {
"config": {
"software_config": {"image_version": "composer-2-airflow-2"},
}
}
使用此配置我们可以创建环境: CloudComposerCreateEnvironmentOperator
tests/system/google/cloud/composer/example_cloud_composer.py
create_env = CloudComposerCreateEnvironmentOperator(
task_id="create_env",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID,
environment=ENVIRONMENT,
)
或者您可以在可推迟模式下定义相同的 operator: CloudComposerCreateEnvironmentOperator
tests/system/google/cloud/composer/example_cloud_composer.py
defer_create_env = CloudComposerCreateEnvironmentOperator(
task_id="defer_create_env",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID_ASYNC,
environment=ENVIRONMENT,
deferrable=True,
)
获取环境¶
要获取环境,您可以使用
CloudComposerGetEnvironmentOperator
tests/system/google/cloud/composer/example_cloud_composer.py
get_env = CloudComposerGetEnvironmentOperator(
task_id="get_env",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID,
)
列出环境¶
要获取环境,您可以使用
CloudComposerListEnvironmentsOperator
tests/system/google/cloud/composer/example_cloud_composer.py
list_envs = CloudComposerListEnvironmentsOperator(
task_id="list_envs", project_id=PROJECT_ID, region=REGION
)
更新环境¶
您可以通过提供环境配置和 updateMask 来更新环境。在 updateMask 参数中,您指定相对于环境的字段路径进行更新。有关 updateMask 和其他参数的更多信息,请参阅 Cloud Composer 更新环境 API。
新的环境配置和 updateMask 的示例
tests/system/google/cloud/composer/example_cloud_composer.py
UPDATED_ENVIRONMENT = {
"labels": {
"label": "testing",
}
}
UPDATE_MASK = {"paths": ["labels.label"]}
要更新环境,您可以使用: CloudComposerUpdateEnvironmentOperator
tests/system/google/cloud/composer/example_cloud_composer.py
update_env = CloudComposerUpdateEnvironmentOperator(
task_id="update_env",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID,
update_mask=UPDATE_MASK,
environment=UPDATED_ENVIRONMENT,
)
或者您可以在可推迟模式下定义相同的 operator: CloudComposerCreateEnvironmentOperator
tests/system/google/cloud/composer/example_cloud_composer.py
defer_update_env = CloudComposerUpdateEnvironmentOperator(
task_id="defer_update_env",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID_ASYNC,
update_mask=UPDATE_MASK,
environment=UPDATED_ENVIRONMENT,
deferrable=True,
)
删除环境¶
要删除环境,您可以使用
CloudComposerDeleteEnvironmentOperator
tests/system/google/cloud/composer/example_cloud_composer.py
delete_env = CloudComposerDeleteEnvironmentOperator(
task_id="delete_env",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID,
)
或者您可以在可推迟模式下定义相同的 operator: CloudComposerDeleteEnvironmentOperator
tests/system/google/cloud/composer/example_cloud_composer.py
defer_delete_env = CloudComposerDeleteEnvironmentOperator(
task_id="defer_delete_env",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID_ASYNC,
deferrable=True,
)
Composer 镜像列表¶
您还可以列出所有支持的 Cloud Composer 镜像
CloudComposerListImageVersionsOperator
tests/system/google/cloud/composer/example_cloud_composer.py
image_versions = CloudComposerListImageVersionsOperator(
task_id="image_versions",
project_id=PROJECT_ID,
region=REGION,
)
运行 Airflow CLI 命令¶
您可以在您的环境中运行 Airflow CLI 命令,使用: CloudComposerRunAirflowCLICommandOperator
tests/system/google/cloud/composer/example_cloud_composer.py
run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
task_id="run_airflow_cli_cmd",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID,
command=COMMAND,
)
或者您可以在可推迟模式下定义相同的 operator
tests/system/google/cloud/composer/example_cloud_composer.py
defer_run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
task_id="defer_run_airflow_cli_cmd",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID_ASYNC,
command=COMMAND,
deferrable=True,
)
检查 DAG 运行是否完成¶
您可以使用 sensor 检查 DAG 运行在您的环境中是否完成,使用: CloudComposerDAGRunSensor
tests/system/google/cloud/composer/example_cloud_composer.py
dag_run_sensor = CloudComposerDAGRunSensor(
task_id="dag_run_sensor",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID,
composer_dag_id="airflow_monitoring",
allowed_states=["success"],
)
或者您可以在可推迟模式下定义相同的 sensor
tests/system/google/cloud/composer/example_cloud_composer.py
defer_dag_run_sensor = CloudComposerDAGRunSensor(
task_id="defer_dag_run_sensor",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID_ASYNC,
composer_dag_id="airflow_monitoring",
allowed_states=["success"],
deferrable=True,
)