Google Dataform 操作符¶
Dataform 是一项为数据分析师提供的服务,用于在 BigQuery 中开发、测试、版本控制和调度复杂 SQL 工作流程,以进行数据转换。
Dataform 允许您在数据集成过程中的提取、加载和转换 (ELT) 过程中管理数据转换。从源系统提取原始数据并加载到 BigQuery 后,Dataform 可帮助您将其转换为定义良好、经过测试和记录的数据表套件。
有关任务的更多信息,请访问 Dataform 文档
配置¶
在使用 Dataform 操作符之前,您需要初始化存储库和工作区,有关此方面的更多信息,请访问 Dataform 文档
创建存储库¶
要在 Dataform 服务中创建用于跟踪代码的存储库,请使用 DataformCreateRepositoryOperator
。用法示例如下所示
make_repository = DataformCreateRepositoryOperator(
task_id="make-repository",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
)
创建工作区¶
要在 Dataform 服务中创建用于存储代码的工作区,请使用 DataformCreateWorkspaceOperator
。用法示例如下所示
make_workspace = DataformCreateWorkspaceOperator(
task_id="make-workspace",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workspace_id=WORKSPACE_ID,
)
创建编译结果¶
要创建编译结果,请使用 DataformCreateCompilationResultOperator
。一个简单的配置如下所示
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create-compilation-result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": "main",
"workspace": (
f"projects/{PROJECT_ID}/locations/{REGION}/repositories/{REPOSITORY_ID}/"
f"workspaces/{WORKSPACE_ID}"
),
},
)
获取编译结果¶
要获取编译结果,您可以使用 DataformGetCompilationResultOperator
。
get_compilation_result = DataformGetCompilationResultOperator(
task_id="get-compilation-result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result_id=(
"{{ task_instance.xcom_pull('create-compilation-result')['name'].split('/')[-1] }}"
),
)
创建工作流程调用¶
要创建工作流程调用,您可以使用 DataformCreateWorkflowInvocationOperator
。
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id="create-workflow-invocation",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create-compilation-result')['name'] }}"
},
)
我们可以同步模式和异步模式运行此操作,对于异步操作,我们还有一个传感器 DataformWorkflowInvocationStateSensor
。
create_workflow_invocation_async = DataformCreateWorkflowInvocationOperator(
task_id="create-workflow-invocation-async",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
asynchronous=True,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create-compilation-result')['name'] }}"
},
)
is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
task_id="is-workflow-invocation-done",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=(
"{{ task_instance.xcom_pull('create-workflow-invocation-async')['name'].split('/')[-1] }}"
),
expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)
我们还有一个传感器 DataformWorkflowInvocationActionStateSensor
,用于检查异步触发的工作流程调用的特定操作的状态。
is_workflow_invocation_action_done = DataformWorkflowInvocationActionStateSensor(
task_id="is-workflow-invocation-action-done",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=(
"{{ task_instance.xcom_pull('create-workflow-invocation-async')['name'].split('/')[-1] }}"
),
target_name="first_view",
expected_statuses={WorkflowInvocationAction.State.SUCCEEDED},
failure_statuses={
WorkflowInvocationAction.State.SKIPPED,
WorkflowInvocationAction.State.DISABLED,
WorkflowInvocationAction.State.CANCELLED,
WorkflowInvocationAction.State.FAILED,
},
)
获取工作流程调用¶
要获取工作流程调用,您可以使用 DataformGetWorkflowInvocationOperator
。
get_workflow_invocation = DataformGetWorkflowInvocationOperator(
task_id="get-workflow-invocation",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=(
"{{ task_instance.xcom_pull('create-workflow-invocation')['name'].split('/')[-1] }}"
),
)
查询工作流程调用操作¶
要查询工作流程调用操作,您可以使用 DataformQueryWorkflowInvocationActionsOperator
。
query_workflow_invocation_actions = DataformQueryWorkflowInvocationActionsOperator(
task_id="query-workflow-invocation-actions",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=(
"{{ task_instance.xcom_pull('create-workflow-invocation')['name'].split('/')[-1] }}"
),
)
取消工作流程调用¶
要取消工作流程调用,您可以使用 DataformCancelWorkflowInvocationOperator
。
cancel_workflow_invocation = DataformCancelWorkflowInvocationOperator(
task_id="cancel-workflow-invocation",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=(
"{{ task_instance.xcom_pull('create-workflow-invocation-for-cancel')['name'].split('/')[-1] }}"
),
)
删除存储库¶
要删除存储库,请使用 DataformDeleteRepositoryOperator
。用法示例如下所示
delete_repository = DataformDeleteRepositoryOperator(
task_id="delete-repository",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
删除工作区¶
要删除工作区,请使用 DataformDeleteWorkspaceOperator
。用法示例如下所示
delete_workspace = DataformDeleteWorkspaceOperator(
task_id="delete-workspace",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workspace_id=WORKSPACE_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
删除文件¶
要删除文件,请使用 DataformRemoveFileOperator
。用法示例如下所示
remove_test_file = DataformRemoveFileOperator(
task_id="remove-test-file",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workspace_id=WORKSPACE_ID,
filepath="test/test.txt",
)
删除目录¶
要删除目录,请使用 DataformRemoveDirectoryOperator
。用法示例如下所示
remove_test_directory = DataformRemoveDirectoryOperator(
task_id="remove-test-directory",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workspace_id=WORKSPACE_ID,
directory_path="test",
)
初始化工作区¶
为提供的工作区创建默认的项目结构。在此之前,应创建工作区和存储库。用法示例如下所示
first_initialization_step, last_initialization_step = make_initialization_workspace_flow(
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workspace_id=WORKSPACE_ID,
package_name=f"dataform_package_{ENV_ID}",
without_installation=True,
dataform_schema_name=DATAFORM_SCHEMA_NAME,
)
将文件写入工作区¶
要将具有给定内容的文件写入指定的工作区,请使用 DataformWriteFileOperator
。
test_file_content = b"""
test test for test file
"""
write_test_file = DataformWriteFileOperator(
task_id="make-test-file",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workspace_id=WORKSPACE_ID,
filepath="test/test.txt",
contents=test_file_content,
)
在工作区中创建目录¶
要在指定的工作区中创建具有给定路径的目录,请使用 DataformMakeDirectoryOperator
。
make_test_directory = DataformMakeDirectoryOperator(
task_id="make-test-directory",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workspace_id=WORKSPACE_ID,
directory_path="test",
)
安装 NPM 包¶
要为指定的工作区安装 npm 包,请使用 DataformInstallNpmPackagesOperator
。
install_npm_packages = DataformInstallNpmPackagesOperator(
task_id="install-npm-packages",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workspace_id=WORKSPACE_ID,
)