Google Cloud Datastore Operator¶
Datastore 模式下的 Firestore 是一个 NoSQL 文档数据库,专为自动扩展、高性能和易于应用程序开发而构建。
有关该服务的更多信息,请访问 Datastore 产品文档
前置任务¶
要使用这些 Operator,您需要执行以下几项操作
使用 Cloud Console 选择或创建一个 Cloud Platform 项目。
为您的项目启用结算,详情请参阅 Google Cloud 文档。
启用 API,详情请参阅 Cloud Console 文档。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'有关 安装 的详细信息。
导出实体¶
要将实体从 Google Cloud Datastore 导出到 Cloud Storage,请使用 CloudDatastoreExportEntitiesOperator
tests/system/google/cloud/datastore/example_datastore_commit.py
export_task = CloudDatastoreExportEntitiesOperator(
task_id="export_task",
bucket=BUCKET_NAME,
project_id=PROJECT_ID,
overwrite_existing=True,
)
导入实体¶
要将实体从 Cloud Storage 导入到 Google Cloud Datastore,请使用 CloudDatastoreImportEntitiesOperator
tests/system/google/cloud/datastore/example_datastore_commit.py
import_task = CloudDatastoreImportEntitiesOperator(
task_id="import_task",
bucket="{{ task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[2] }}",
file="{{ '/'.join(task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[3:]) }}",
project_id=PROJECT_ID,
)
分配 ID¶
要为不完整键分配 ID,请使用 CloudDatastoreAllocateIdsOperator
tests/system/google/cloud/datastore/example_datastore_commit.py
allocate_ids = CloudDatastoreAllocateIdsOperator(
task_id="allocate_ids", partial_keys=KEYS, project_id=PROJECT_ID
)
Operator 所需的部分键示例
tests/system/google/cloud/datastore/example_datastore_commit.py
KEYS = [
{
"partitionId": {"projectId": PROJECT_ID, "namespaceId": ""},
"path": {"kind": "airflow"},
}
]
开始事务¶
要开始新事务,请使用 CloudDatastoreBeginTransactionOperator
tests/system/google/cloud/datastore/example_datastore_commit.py
begin_transaction_commit = CloudDatastoreBeginTransactionOperator(
task_id="begin_transaction_commit",
transaction_options=TRANSACTION_OPTIONS,
project_id=PROJECT_ID,
)
Operator 所需的事务选项示例
tests/system/google/cloud/datastore/example_datastore_commit.py
TRANSACTION_OPTIONS: dict[str, Any] = {"readWrite": {}}
提交事务¶
要提交事务,并可选择创建、删除或修改某些实体,请使用 CloudDatastoreCommitOperator
tests/system/google/cloud/datastore/example_datastore_commit.py
commit_task = CloudDatastoreCommitOperator(task_id="commit_task", body=COMMIT_BODY, project_id=PROJECT_ID)
Operator 所需的提交信息示例
tests/system/google/cloud/datastore/example_datastore_commit.py
COMMIT_BODY = {
"mode": "TRANSACTIONAL",
"mutations": [
{
"insert": {
"key": KEYS[0],
"properties": {"string": {"stringValue": "airflow is awesome!"}},
}
}
],
"singleUseTransaction": {"readWrite": {}},
}
运行查询¶
要对实体运行查询,请使用 CloudDatastoreRunQueryOperator
tests/system/google/cloud/datastore/example_datastore_query.py
run_query = CloudDatastoreRunQueryOperator(task_id="run_query", body=QUERY, project_id=PROJECT_ID)
Operator 所需的查询示例
tests/system/google/cloud/datastore/example_datastore_query.py
QUERY = {
"partitionId": {"projectId": PROJECT_ID, "namespaceId": "query"},
"readOptions": {"transaction": begin_transaction_query.output},
"query": {},
}
回滚事务¶
要回滚事务,请使用 CloudDatastoreRollbackOperator
tests/system/google/cloud/datastore/example_datastore_rollback.py
rollback_transaction = CloudDatastoreRollbackOperator(
task_id="rollback_transaction",
transaction=begin_transaction_to_rollback.output,
)
获取操作状态¶
要获取长时间运行的操作的当前状态,请使用 CloudDatastoreGetOperationOperator
tests/system/google/cloud/datastore/example_datastore_commit.py
get_operation = CloudDatastoreGetOperationOperator(
task_id="get_operation", name="{{ task_instance.xcom_pull('export_task')['name'] }}"
)
删除操作¶
要删除操作,请使用 CloudDatastoreDeleteOperationOperator
tests/system/google/cloud/datastore/example_datastore_commit.py
delete_export_operation = CloudDatastoreDeleteOperationOperator(
task_id="delete_export_operation",
name="{{ task_instance.xcom_pull('export_task')['name'] }}",
trigger_rule=TriggerRule.ALL_DONE,
)
参考资料¶
如需更多信息,请参阅