Google Dataform 操作符

Dataform 是一项服务,供数据分析师开发、测试、版本控制和调度用于 BigQuery 中数据转换的复杂 SQL 工作流。

Dataform 允许您在数据集成过程的抽取、加载和转换 (ELT) 阶段管理数据转换。从源系统抽取原始数据并加载到 BigQuery 后,Dataform 可帮助您将其转换为定义良好、经过测试并提供文档的一套数据表。

有关此任务的更多信息,请访问 Dataform 文档

配置

在使用 Dataform 操作符之前,您需要初始化仓库和工作区,有关这方面的更多信息,请访问 Dataform 文档

创建仓库

要在 Dataform 服务中创建一个用于跟踪您代码的仓库,请使用 DataformCreateRepositoryOperator。用法示例如下所示

tests/system/google/cloud/dataform/example_dataform.py

    make_repository = DataformCreateRepositoryOperator(
        task_id="make-repository",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
    )

创建工作区

要在 Dataform 服务中创建一个用于存储您代码的工作区,请使用 DataformCreateWorkspaceOperator。用法示例如下所示

tests/system/google/cloud/dataform/example_dataform.py

    make_workspace = DataformCreateWorkspaceOperator(
        task_id="make-workspace",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workspace_id=WORKSPACE_ID,
    )

创建编译结果

要创建编译结果 (Compilation Result),请使用 DataformCreateCompilationResultOperator。一个简单的配置可能如下所示

tests/system/google/cloud/dataform/example_dataform.py

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create-compilation-result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": "main",
            "workspace": (
                f"projects/{PROJECT_ID}/locations/{REGION}/repositories/{REPOSITORY_ID}/"
                f"workspaces/{WORKSPACE_ID}"
            ),
        },
    )

获取编译结果

要获取编译结果 (Compilation Result),您可以使用 DataformGetCompilationResultOperator

tests/system/google/cloud/dataform/example_dataform.py

get_compilation_result = DataformGetCompilationResultOperator(
    task_id="get-compilation-result",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    compilation_result_id=(
        "{{ task_instance.xcom_pull('create-compilation-result')['name'].split('/')[-1] }}"
    ),
)

创建工作流调用

要创建工作流调用 (Workflow Invocation),您可以使用 DataformCreateWorkflowInvocationOperator

tests/system/google/cloud/dataform/example_dataform.py

create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
    task_id="create-workflow-invocation",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation={
        "compilation_result": "{{ task_instance.xcom_pull('create-compilation-result')['name'] }}"
    },
)

我们可以同步模式和异步模式运行此操作,对于异步操作,我们还有一个传感器 DataformWorkflowInvocationStateSensor

tests/system/google/cloud/dataform/example_dataform.py

create_workflow_invocation_async = DataformCreateWorkflowInvocationOperator(
    task_id="create-workflow-invocation-async",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    asynchronous=True,
    workflow_invocation={
        "compilation_result": "{{ task_instance.xcom_pull('create-compilation-result')['name'] }}"
    },
)

is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
    task_id="is-workflow-invocation-done",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=(
        "{{ task_instance.xcom_pull('create-workflow-invocation-async')['name'].split('/')[-1] }}"
    ),
    expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)

我们还有一个传感器 DataformWorkflowInvocationActionStateSensor,用于检查异步触发的工作流调用的特定操作的状态。

tests/system/google/cloud/dataform/example_dataform.py

is_workflow_invocation_action_done = DataformWorkflowInvocationActionStateSensor(
    task_id="is-workflow-invocation-action-done",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=(
        "{{ task_instance.xcom_pull('create-workflow-invocation-async')['name'].split('/')[-1] }}"
    ),
    target_name="first_view",
    expected_statuses={WorkflowInvocationAction.State.SUCCEEDED},
    failure_statuses={
        WorkflowInvocationAction.State.SKIPPED,
        WorkflowInvocationAction.State.DISABLED,
        WorkflowInvocationAction.State.CANCELLED,
        WorkflowInvocationAction.State.FAILED,
    },
)

获取工作流调用

要获取工作流调用 (Workflow Invocation),您可以使用 DataformGetWorkflowInvocationOperator

tests/system/google/cloud/dataform/example_dataform.py

get_workflow_invocation = DataformGetWorkflowInvocationOperator(
    task_id="get-workflow-invocation",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=(
        "{{ task_instance.xcom_pull('create-workflow-invocation')['name'].split('/')[-1] }}"
    ),
)

查询工作流调用操作

要查询工作流调用操作 (Workflow Invocation Actions),您可以使用 DataformQueryWorkflowInvocationActionsOperator

tests/system/google/cloud/dataform/example_dataform.py

query_workflow_invocation_actions = DataformQueryWorkflowInvocationActionsOperator(
    task_id="query-workflow-invocation-actions",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=(
        "{{ task_instance.xcom_pull('create-workflow-invocation')['name'].split('/')[-1] }}"
    ),
)

取消工作流调用

要取消工作流调用 (Workflow Invocation),您可以使用 DataformCancelWorkflowInvocationOperator

tests/system/google/cloud/dataform/example_dataform.py

cancel_workflow_invocation = DataformCancelWorkflowInvocationOperator(
    task_id="cancel-workflow-invocation",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=(
        "{{ task_instance.xcom_pull('create-workflow-invocation-for-cancel')['name'].split('/')[-1] }}"
    ),
)

删除仓库

要删除仓库 (repository),请使用 DataformDeleteRepositoryOperator。用法示例如下所示

tests/system/google/cloud/dataform/example_dataform.py

    delete_repository = DataformDeleteRepositoryOperator(
        task_id="delete-repository",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        trigger_rule=TriggerRule.ALL_DONE,
    )

删除工作区

要删除工作区 (workspace),请使用 DataformDeleteWorkspaceOperator。用法示例如下所示

tests/system/google/cloud/dataform/example_dataform.py

    delete_workspace = DataformDeleteWorkspaceOperator(
        task_id="delete-workspace",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workspace_id=WORKSPACE_ID,
        trigger_rule=TriggerRule.ALL_DONE,
    )

移除文件

要移除文件 (file),请使用 DataformRemoveFileOperator。用法示例如下所示

tests/system/google/cloud/dataform/example_dataform.py

    remove_test_file = DataformRemoveFileOperator(
        task_id="remove-test-file",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workspace_id=WORKSPACE_ID,
        filepath="test/test.txt",
    )

移除目录

要移除目录 (directory),请使用 DataformRemoveDirectoryOperator。用法示例如下所示

tests/system/google/cloud/dataform/example_dataform.py

    remove_test_directory = DataformRemoveDirectoryOperator(
        task_id="remove-test-directory",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workspace_id=WORKSPACE_ID,
        directory_path="test",
    )

初始化工作区

为提供的工作区创建默认的项目结构。在此之前应先创建工作区和仓库。用法示例如下所示

tests/system/google/cloud/dataform/example_dataform.py

    first_initialization_step, last_initialization_step = make_initialization_workspace_flow(
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workspace_id=WORKSPACE_ID,
        package_name=f"dataform_package_{ENV_ID}",
        without_installation=True,
        dataform_schema_name=DATAFORM_SCHEMA_NAME,
    )

向工作区写入文件

要将给定内容的文件写入指定工作区 (workspace),请使用 DataformWriteFileOperator

tests/system/google/cloud/dataform/example_dataform.py

test_file_content = b"""
test test for test file
"""
write_test_file = DataformWriteFileOperator(
    task_id="make-test-file",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workspace_id=WORKSPACE_ID,
    filepath="test/test.txt",
    contents=test_file_content,
)

在工作区中创建目录

要在指定工作区 (workspace) 中创建具有给定路径的目录,请使用 DataformMakeDirectoryOperator

tests/system/google/cloud/dataform/example_dataform.py

make_test_directory = DataformMakeDirectoryOperator(
    task_id="make-test-directory",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workspace_id=WORKSPACE_ID,
    directory_path="test",
)

安装 NPM 包

要为指定工作区 (workspace) 安装 npm 包,请使用 DataformInstallNpmPackagesOperator

tests/system/google/cloud/dataform/example_dataform.py

install_npm_packages = DataformInstallNpmPackagesOperator(
    task_id="install-npm-packages",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workspace_id=WORKSPACE_ID,
)

此条目有帮助吗?