Google Cloud Datastore Operator

Datastore 模式下的 Firestore 是一个 NoSQL 文档数据库,专为自动扩展、高性能和易于应用程序开发而构建。

有关该服务的更多信息,请访问 Datastore 产品文档

前置任务

要使用这些 Operator,您需要执行以下几项操作

导出实体

要将实体从 Google Cloud Datastore 导出到 Cloud Storage,请使用 CloudDatastoreExportEntitiesOperator

tests/system/google/cloud/datastore/example_datastore_commit.py

export_task = CloudDatastoreExportEntitiesOperator(
    task_id="export_task",
    bucket=BUCKET_NAME,
    project_id=PROJECT_ID,
    overwrite_existing=True,
)

导入实体

要将实体从 Cloud Storage 导入到 Google Cloud Datastore,请使用 CloudDatastoreImportEntitiesOperator

tests/system/google/cloud/datastore/example_datastore_commit.py

import_task = CloudDatastoreImportEntitiesOperator(
    task_id="import_task",
    bucket="{{ task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[2] }}",
    file="{{ '/'.join(task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[3:]) }}",
    project_id=PROJECT_ID,
)

分配 ID

要为不完整键分配 ID,请使用 CloudDatastoreAllocateIdsOperator

tests/system/google/cloud/datastore/example_datastore_commit.py

allocate_ids = CloudDatastoreAllocateIdsOperator(
    task_id="allocate_ids", partial_keys=KEYS, project_id=PROJECT_ID
)

Operator 所需的部分键示例

tests/system/google/cloud/datastore/example_datastore_commit.py

KEYS = [
    {
        "partitionId": {"projectId": PROJECT_ID, "namespaceId": ""},
        "path": {"kind": "airflow"},
    }
]

开始事务

要开始新事务,请使用 CloudDatastoreBeginTransactionOperator

tests/system/google/cloud/datastore/example_datastore_commit.py

begin_transaction_commit = CloudDatastoreBeginTransactionOperator(
    task_id="begin_transaction_commit",
    transaction_options=TRANSACTION_OPTIONS,
    project_id=PROJECT_ID,
)

Operator 所需的事务选项示例

tests/system/google/cloud/datastore/example_datastore_commit.py

TRANSACTION_OPTIONS: dict[str, Any] = {"readWrite": {}}

提交事务

要提交事务,并可选择创建、删除或修改某些实体,请使用 CloudDatastoreCommitOperator

tests/system/google/cloud/datastore/example_datastore_commit.py

commit_task = CloudDatastoreCommitOperator(task_id="commit_task", body=COMMIT_BODY, project_id=PROJECT_ID)

Operator 所需的提交信息示例

tests/system/google/cloud/datastore/example_datastore_commit.py

    COMMIT_BODY = {
        "mode": "TRANSACTIONAL",
        "mutations": [
            {
                "insert": {
                    "key": KEYS[0],
                    "properties": {"string": {"stringValue": "airflow is awesome!"}},
                }
            }
        ],
        "singleUseTransaction": {"readWrite": {}},
    }

运行查询

要对实体运行查询,请使用 CloudDatastoreRunQueryOperator

tests/system/google/cloud/datastore/example_datastore_query.py

run_query = CloudDatastoreRunQueryOperator(task_id="run_query", body=QUERY, project_id=PROJECT_ID)

Operator 所需的查询示例

tests/system/google/cloud/datastore/example_datastore_query.py

    QUERY = {
        "partitionId": {"projectId": PROJECT_ID, "namespaceId": "query"},
        "readOptions": {"transaction": begin_transaction_query.output},
        "query": {},
    }

回滚事务

要回滚事务,请使用 CloudDatastoreRollbackOperator

tests/system/google/cloud/datastore/example_datastore_rollback.py

rollback_transaction = CloudDatastoreRollbackOperator(
    task_id="rollback_transaction",
    transaction=begin_transaction_to_rollback.output,
)

获取操作状态

要获取长时间运行的操作的当前状态,请使用 CloudDatastoreGetOperationOperator

tests/system/google/cloud/datastore/example_datastore_commit.py

get_operation = CloudDatastoreGetOperationOperator(
    task_id="get_operation", name="{{ task_instance.xcom_pull('export_task')['name'] }}"
)

删除操作

要删除操作,请使用 CloudDatastoreDeleteOperationOperator

tests/system/google/cloud/datastore/example_datastore_commit.py

delete_export_operation = CloudDatastoreDeleteOperationOperator(
    task_id="delete_export_operation",
    name="{{ task_instance.xcom_pull('export_task')['name'] }}",
    trigger_rule=TriggerRule.ALL_DONE,
)

参考资料

如需更多信息,请参阅

此条目是否有帮助?