Google Cloud Bigtable 操作符¶
先决条件任务¶
要使用这些操作符,您必须执行一些操作
使用 Cloud Console 选择或创建一个 Cloud Platform 项目。
为您的项目启用结算功能,具体操作请参阅 Google Cloud 文档。
启用 API,具体操作请参阅 Cloud Console 文档。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'详细信息请参阅 安装。
BigtableCreateInstanceOperator¶
使用 BigtableCreateInstanceOperator
创建 Google Cloud Bigtable 实例。
如果具有给定 ID 的 Cloud Bigtable 实例已存在,操作符不会比较其配置并立即成功。对现有实例不进行任何更改。
使用操作符¶
您可以创建带或不带项目 ID 的操作符。如果缺少项目 ID,将从使用的 Google Cloud 连接中检索。此处展示了两种变体
tests/system/google/cloud/bigtable/example_bigtable.py
create_instance_task = BigtableCreateInstanceOperator(
project_id=PROJECT_ID,
instance_id=CBT_INSTANCE_ID_1,
main_cluster_id=CBT_CLUSTER_ID,
main_cluster_zone=CBT_CLUSTER_ZONE,
instance_display_name=CBT_INSTANCE_DISPLAY_NAME,
instance_type=CBT_INSTANCE_TYPE, # type: ignore[arg-type]
instance_labels=CBT_INSTANCE_LABELS,
cluster_nodes=None,
cluster_storage_type=CBT_CLUSTER_STORAGE_TYPE, # type: ignore[arg-type]
task_id="create_instance_task",
)
BigtableUpdateInstanceOperator¶
使用 BigtableUpdateInstanceOperator
更新现有的 Google Cloud Bigtable 实例。
对于现有实例,只能更新以下配置:instance_display_name、instance_type 和 instance_labels。
使用操作符¶
您可以创建带或不带项目 ID 的操作符。如果缺少项目 ID,将从使用的 Google Cloud 连接中检索。此处展示了两种变体
tests/system/google/cloud/bigtable/example_bigtable.py
update_instance_task = BigtableUpdateInstanceOperator(
instance_id=CBT_INSTANCE_ID_1,
instance_display_name=CBT_INSTANCE_DISPLAY_NAME_UPDATED,
instance_type=CBT_INSTANCE_TYPE_PROD,
instance_labels=CBT_INSTANCE_LABELS_UPDATED,
task_id="update_instance_task",
)
BigtableDeleteInstanceOperator¶
使用 BigtableDeleteInstanceOperator
删除 Google Cloud Bigtable 实例。
使用操作符¶
您可以创建带或不带项目 ID 的操作符。如果缺少项目 ID,将从使用的 Google Cloud 连接中检索。此处展示了两种变体
tests/system/google/cloud/bigtable/example_bigtable.py
delete_instance_task = BigtableDeleteInstanceOperator(
project_id=PROJECT_ID,
instance_id=CBT_INSTANCE_ID_1,
task_id="delete_instance_task",
)
BigtableUpdateClusterOperator¶
使用 BigtableUpdateClusterOperator
修改 Cloud Bigtable 集群中的节点数量。
使用操作符¶
您可以创建带或不带项目 ID 的操作符。如果缺少项目 ID,将从使用的 Google Cloud 连接中检索。此处展示了两种变体
tests/system/google/cloud/bigtable/example_bigtable.py
cluster_update_task = BigtableUpdateClusterOperator(
project_id=PROJECT_ID,
instance_id=CBT_INSTANCE_ID_1,
cluster_id=CBT_CLUSTER_ID,
nodes=CBT_CLUSTER_NODES_UPDATED,
task_id="update_cluster_task",
)
BigtableCreateTableOperator¶
在 Cloud Bigtable 实例中创建表。
如果具有给定 ID 的表已存在于 Cloud Bigtable 实例中,操作符将比较列族。如果列族相同,操作符将成功。否则,操作符将返回适当的错误消息并失败。
使用操作符¶
您可以创建带或不带项目 ID 的操作符。如果缺少项目 ID,将从使用的 Google Cloud 连接中检索。此处展示了两种变体
tests/system/google/cloud/bigtable/example_bigtable.py
create_table_task = BigtableCreateTableOperator(
project_id=PROJECT_ID,
instance_id=CBT_INSTANCE_ID_1,
table_id=CBT_TABLE_ID,
task_id="create_table",
)
高级¶
创建表时,您可以指定可选的 initial_split_keys
和 column_families
。请参考 Python Client for Google Cloud Bigtable 文档中的 Table 和 Column Families 部分。
BigtableDeleteTableOperator¶
使用 BigtableDeleteTableOperator
删除 Google Cloud Bigtable 中的表。
使用操作符¶
您可以创建带或不带项目 ID 的操作符。如果缺少项目 ID,将从使用的 Google Cloud 连接中检索。此处展示了两种变体
tests/system/google/cloud/bigtable/example_bigtable.py
delete_table_task = BigtableDeleteTableOperator(
project_id=PROJECT_ID,
instance_id=CBT_INSTANCE_ID_1,
table_id=CBT_TABLE_ID,
task_id="delete_table_task",
)
BigtableTableReplicationCompletedSensor¶
您可以创建带或不带项目 ID 的操作符。如果缺少项目 ID,将从使用的 Google Cloud 连接中检索。此处展示了两种变体
使用 BigtableTableReplicationCompletedSensor
等待表完全复制完成。
适用于 BigtableCreateTableOperator 的相同参数也适用于此传感器。
注意: 如果表或 Cloud Bigtable 实例不存在,此传感器将一直等待直到超时,并且不会引发任何异常。
使用操作符¶
tests/system/google/cloud/bigtable/example_bigtable.py
wait_for_table_replication_task = BigtableTableReplicationCompletedSensor(
instance_id=CBT_INSTANCE_ID_2,
table_id=CBT_TABLE_ID,
poke_interval=CBT_POKE_INTERVAL,
timeout=180,
task_id="wait_for_table_replication_task2",
)
参考¶
更多信息,请参阅