Google Cloud Spanner Operator¶
先决条件任务¶
要使用这些 operator,您必须完成一些步骤
使用 Cloud Console 选择或创建一个 Cloud Platform 项目。
为您的项目启用结算,如 Google Cloud 文档中所述。
启用 API,如 Cloud Console 文档中所述。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'有关安装的详细信息,请参阅安装文档。
SpannerDeployInstanceOperator¶
创建新的 Cloud Spanner 实例,或者如果指定项目中已存在具有相同实例 ID 的实例,则更新该 Cloud Spanner 实例。
有关参数定义,请参阅 SpannerDeployInstanceOperator
。
使用 operator¶
您可以创建带或不带项目 ID 的 operator。如果缺少项目 ID,将从使用的 Google Cloud 连接中检索。以下显示了两种变体
tests/system/google/cloud/spanner/example_spanner.py
spanner_instance_create_task = SpannerDeployInstanceOperator(
project_id=PROJECT_ID,
instance_id=GCP_SPANNER_INSTANCE_ID,
configuration_name=GCP_SPANNER_CONFIG_NAME,
node_count=GCP_SPANNER_NODE_COUNT,
display_name=GCP_SPANNER_DISPLAY_NAME,
task_id="spanner_instance_create_task",
)
spanner_instance_update_task = SpannerDeployInstanceOperator(
instance_id=GCP_SPANNER_INSTANCE_ID,
configuration_name=GCP_SPANNER_CONFIG_NAME,
node_count=GCP_SPANNER_NODE_COUNT + 1,
display_name=GCP_SPANNER_DISPLAY_NAME + "_updated",
task_id="spanner_instance_update_task",
)
模板化¶
template_fields: Sequence[str] = (
"project_id",
"instance_id",
"configuration_name",
"display_name",
"gcp_conn_id",
"impersonation_chain",
)
更多信息¶
请参阅 Google Cloud Spanner API 文档以创建新实例。
SpannerDeleteDatabaseInstanceOperator¶
从指定的 Cloud Spanner 实例中删除数据库。如果数据库不存在,则不执行任何操作,operator 将成功完成。
有关参数定义,请参阅 SpannerDeleteDatabaseInstanceOperator
。
使用 operator¶
您可以创建带或不带项目 ID 的 operator。如果缺少项目 ID,将从使用的 Google Cloud 连接中检索。以下显示了两种变体
tests/system/google/cloud/spanner/example_spanner.py
spanner_database_delete_task = SpannerDeleteDatabaseInstanceOperator(
instance_id=GCP_SPANNER_INSTANCE_ID,
database_id=GCP_SPANNER_DATABASE_ID,
task_id="spanner_database_delete_task",
)
模板化¶
template_fields: Sequence[str] = (
"project_id",
"instance_id",
"gcp_conn_id",
"impersonation_chain",
)
更多信息¶
SpannerDeployDatabaseInstanceOperator¶
在指定的实例中创建新的 Cloud Spanner 数据库,或者如果所需数据库已存在,则假定成功,并且不更改数据库配置。不对数据库结构进行验证 - 只要数据库以相同的名称存在即可。
有关参数定义,请参阅 SpannerDeployDatabaseInstanceOperator
。
使用 operator¶
您可以创建带或不带项目 ID 的 operator。如果缺少项目 ID,将从使用的 Google Cloud 连接中检索。以下显示了两种变体
tests/system/google/cloud/spanner/example_spanner.py
spanner_database_deploy_task = SpannerDeployDatabaseInstanceOperator(
instance_id=GCP_SPANNER_INSTANCE_ID,
database_id=GCP_SPANNER_DATABASE_ID,
ddl_statements=[
"CREATE TABLE my_table1 (id INT64, name STRING(MAX)) PRIMARY KEY (id)",
"CREATE TABLE my_table2 (id INT64, name STRING(MAX)) PRIMARY KEY (id)",
],
task_id="spanner_database_deploy_task",
)
模板化¶
template_fields: Sequence[str] = (
"project_id",
"instance_id",
"database_id",
"ddl_statements",
"gcp_conn_id",
"impersonation_chain",
)
template_ext: Sequence[str] = (".sql",)
template_fields_renderers = {"ddl_statements": "sql"}
更多信息¶
请参阅 Google Cloud Spanner API 文档以创建新数据库。
SpannerUpdateDatabaseInstanceOperator¶
在 Cloud Spanner 数据库中运行 DDL 查询,允许您修改现有数据库的结构。
您可以选择指定 operation_id 参数,这有助于确定在 update_database 调用被重放时语句是否已执行(幂等性检查)。operation_id 在数据库中必须唯一,并且必须是有效的标识符:[a-z][a-z0-9_]*
。更多信息可以在 updateDdl API 文档中找到
有关参数定义,请参阅 SpannerUpdateDatabaseInstanceOperator
。
使用 operator¶
您可以创建带或不带项目 ID 的 operator。如果缺少项目 ID,将从使用的 Google Cloud 连接中检索。以下显示了两种变体
tests/system/google/cloud/spanner/example_spanner.py
spanner_database_update_task = SpannerUpdateDatabaseInstanceOperator(
instance_id=GCP_SPANNER_INSTANCE_ID,
database_id=GCP_SPANNER_DATABASE_ID,
ddl_statements=[
"CREATE TABLE my_table3 (id INT64, name STRING(MAX)) PRIMARY KEY (id)",
],
task_id="spanner_database_update_task",
)
tests/system/google/cloud/spanner/example_spanner.py
spanner_database_update_idempotent1_task = SpannerUpdateDatabaseInstanceOperator(
project_id=PROJECT_ID,
instance_id=GCP_SPANNER_INSTANCE_ID,
database_id=GCP_SPANNER_DATABASE_ID,
operation_id=OPERATION_ID,
ddl_statements=[
"CREATE TABLE my_table_unique (id INT64, name STRING(MAX)) PRIMARY KEY (id)",
],
task_id="spanner_database_update_idempotent1_task",
)
spanner_database_update_idempotent2_task = SpannerUpdateDatabaseInstanceOperator(
instance_id=GCP_SPANNER_INSTANCE_ID,
database_id=GCP_SPANNER_DATABASE_ID,
operation_id=OPERATION_ID,
ddl_statements=[
"CREATE TABLE my_table_unique (id INT64, name STRING(MAX)) PRIMARY KEY (id)",
],
task_id="spanner_database_update_idempotent2_task",
)
模板化¶
template_fields: Sequence[str] = (
"project_id",
"instance_id",
"database_id",
"ddl_statements",
"gcp_conn_id",
"impersonation_chain",
)
template_ext: Sequence[str] = (".sql",)
template_fields_renderers = {"ddl_statements": "sql"}
更多信息¶
请参阅 Google Cloud Spanner API 文档了解数据库 update_ddl。
SpannerQueryDatabaseInstanceOperator¶
执行任意 DML 查询(INSERT, UPDATE, DELETE)。
有关参数定义,请参阅 SpannerQueryDatabaseInstanceOperator
。
使用 operator¶
您可以创建带或不带项目 ID 的 operator。如果缺少项目 ID,将从使用的 Google Cloud 连接中检索。以下显示了两种变体
tests/system/google/cloud/spanner/example_spanner.py
spanner_instance_query_task = SpannerQueryDatabaseInstanceOperator(
instance_id=GCP_SPANNER_INSTANCE_ID,
database_id=GCP_SPANNER_DATABASE_ID,
query=["DELETE FROM my_table2 WHERE true"],
task_id="spanner_instance_query_task",
)
模板化¶
template_fields: Sequence[str] = (
"project_id",
"instance_id",
"database_id",
"query",
"gcp_conn_id",
"impersonation_chain",
)
template_ext: Sequence[str] = (".sql",)
template_fields_renderers = {"query": "sql"}
更多信息¶
有关 DML 语法的更多信息,请参阅 Google Cloud Spanner API 文档。
SpannerDeleteInstanceOperator¶
删除 Cloud Spanner 实例。如果实例不存在,则不执行任何操作,operator 将成功完成。
有关参数定义,请参阅 SpannerDeleteInstanceOperator
。
使用 operator¶
您可以创建带或不带项目 ID 的 operator。如果缺少项目 ID,将从使用的 Google Cloud 连接中检索。以下显示了两种变体
tests/system/google/cloud/spanner/example_spanner.py
spanner_instance_delete_task = SpannerDeleteInstanceOperator(
instance_id=GCP_SPANNER_INSTANCE_ID, task_id="spanner_instance_delete_task"
)
模板化¶
template_fields: Sequence[str] = (
"project_id",
"instance_id",
"gcp_conn_id",
"impersonation_chain",
)
更多信息¶
请参阅 Google Cloud Spanner API 文档以删除实例。
参考¶
更多信息,请参阅