Google DataFusion 操作符

Cloud Data Fusion 是一项完全托管的云原生数据集成服务,可帮助用户高效地构建和管理 ETL/ELT 数据管道。Cloud Data Fusion 提供图形界面以及包含预配置连接器和转换的广泛开源库,使组织能够将重心从代码和集成转移到洞察和行动上。

先决条件任务

要使用这些操作符,您必须完成以下几件事

重启 DataFusion 实例

要重启 Data Fusion 实例,请使用:CloudDataFusionRestartInstanceOperator

tests/system/google/cloud/datafusion/example_datafusion.py

restart_instance = CloudDataFusionRestartInstanceOperator(
    location=LOCATION, instance_name=INSTANCE_NAME, task_id="restart_instance"
)

您可以将 Jinja 模板instance_name, impersonation_chain 参数一起使用,以动态确定值。结果会保存到 XCom 中,其他操作符可以使用该结果。

删除 DataFusion 实例

要删除 Data Fusion 实例,请使用:CloudDataFusionDeleteInstanceOperator

tests/system/google/cloud/datafusion/example_datafusion.py

delete_instance = CloudDataFusionDeleteInstanceOperator(
    location=LOCATION,
    instance_name=INSTANCE_NAME,
    task_id="delete_instance",
    trigger_rule=TriggerRule.ALL_DONE,
)

您可以将 Jinja 模板instance_name, impersonation_chain 参数一起使用,以动态确定值。结果会保存到 XCom 中,其他操作符可以使用该结果。

创建 DataFusion 实例

要创建 Data Fusion 实例,请使用:CloudDataFusionCreateInstanceOperator

tests/system/google/cloud/datafusion/example_datafusion.py

create_instance = CloudDataFusionCreateInstanceOperator(
    location=LOCATION,
    instance_name=INSTANCE_NAME,
    instance=INSTANCE,
    task_id="create_instance",
)

您可以将 Jinja 模板instance_name, instance, impersonation_chain 参数一起使用,以动态确定值。结果会保存到 XCom 中,其他操作符可以使用该结果。

更新 DataFusion 实例

要更新 Data Fusion 实例,请使用:CloudDataFusionUpdateInstanceOperator

tests/system/google/cloud/datafusion/example_datafusion.py

update_instance = CloudDataFusionUpdateInstanceOperator(
    location=LOCATION,
    instance_name=INSTANCE_NAME,
    instance=INSTANCE,
    update_mask="",
    task_id="update_instance",
)

您可以将 Jinja 模板instance_name, instance, impersonation_chain 参数一起使用,以动态确定值。结果会保存到 XCom 中,其他操作符可以使用该结果。

获取 DataFusion 实例

要检索 Data Fusion 实例,请使用:CloudDataFusionGetInstanceOperator

tests/system/google/cloud/datafusion/example_datafusion.py

get_instance = CloudDataFusionGetInstanceOperator(
    location=LOCATION, instance_name=INSTANCE_NAME, task_id="get_instance"
)

您可以将 Jinja 模板instance_name, impersonation_chain 参数一起使用,以动态确定值。结果会保存到 XCom 中,其他操作符可以使用该结果。

创建 DataFusion 管道

要创建 Data Fusion 管道,请使用:CloudDataFusionCreatePipelineOperator

tests/system/google/cloud/datafusion/example_datafusion.py

create_pipeline = CloudDataFusionCreatePipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    pipeline=PIPELINE,
    instance_name=INSTANCE_NAME,
    task_id="create_pipeline",
)

您可以将 Jinja 模板instance_name, pipeline_name, impersonation_chain 参数一起使用,以动态确定值。结果会保存到 XCom 中,其他操作符可以使用该结果。

启动 DataFusion 管道

要使用同步模式启动 Data Fusion 管道,请使用:CloudDataFusionStartPipelineOperator

tests/system/google/cloud/datafusion/example_datafusion.py

start_pipeline = CloudDataFusionStartPipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    instance_name=INSTANCE_NAME,
    pipeline_timeout=1000,
    task_id="start_pipeline",
)

要使用异步模式启动 Data Fusion 管道,请使用:CloudDataFusionStartPipelineOperator

tests/system/google/cloud/datafusion/example_datafusion.py

start_pipeline_async = CloudDataFusionStartPipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    instance_name=INSTANCE_NAME,
    asynchronous=True,
    task_id="start_pipeline_async",
)

可以使用可延迟模式异步启动 Data Fusion 管道。虽然异步参数可以使用同步的 sleep() 方法等待 DataFusion 管道达到终止状态,但可延迟模式使用异步调用检查状态。不能同时使用异步和可延迟参数。请查看使用可延迟模式的示例:CloudDataFusionStartPipelineOperator

tests/system/google/cloud/datafusion/example_datafusion.py

start_pipeline_def = CloudDataFusionStartPipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    instance_name=INSTANCE_NAME,
    task_id="start_pipeline_def",
    deferrable=True,
)

您可以将 Jinja 模板instance_name, pipeline_name, runtime_args, impersonation_chain 参数一起使用,以动态确定值。结果会保存到 XCom 中,其他操作符可以使用该结果。

停止 DataFusion 管道

要停止 Data Fusion 管道,请使用:CloudDataFusionStopPipelineOperator

tests/system/google/cloud/datafusion/example_datafusion.py

stop_pipeline = CloudDataFusionStopPipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    instance_name=INSTANCE_NAME,
    task_id="stop_pipeline",
)

您可以将 Jinja 模板instance_name, pipeline_name, impersonation_chain 参数一起使用,以动态确定值。结果会保存到 XCom 中,其他操作符可以使用该结果。

删除 DataFusion 管道

要删除 Data Fusion 管道,请使用:CloudDataFusionDeletePipelineOperator

tests/system/google/cloud/datafusion/example_datafusion.py

delete_pipeline = CloudDataFusionDeletePipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    instance_name=INSTANCE_NAME,
    task_id="delete_pipeline",
    trigger_rule=TriggerRule.ALL_DONE,
)

您可以将 Jinja 模板instance_name, version_id, pipeline_name, impersonation_chain 参数一起使用,以动态确定值。结果会保存到 XCom 中,其他操作符可以使用该结果。

列出 DataFusion 管道

要列出 Data Fusion 管道,请使用:CloudDataFusionListPipelinesOperator

tests/system/google/cloud/datafusion/example_datafusion.py

list_pipelines = CloudDataFusionListPipelinesOperator(
    location=LOCATION, instance_name=INSTANCE_NAME, task_id="list_pipelines"
)

您可以将 Jinja 模板instance_name, artifact_name, artifact_version, impersonation_chain 参数一起使用,以动态确定值。结果会保存到 XCom 中,其他操作符可以使用该结果。

传感器

当管道启动被异步触发时,可以使用传感器来运行检查并验证管道是否处于正确状态。

CloudDataFusionPipelineStateSensor.

tests/system/google/cloud/datafusion/example_datafusion.py

start_pipeline_sensor = CloudDataFusionPipelineStateSensor(
    task_id="pipeline_state_sensor",
    pipeline_name=PIPELINE_NAME,
    pipeline_id=start_pipeline_async.output,
    expected_statuses=["COMPLETED"],
    failure_statuses=["FAILED"],
    instance_name=INSTANCE_NAME,
    location=LOCATION,
)

CloudDataFusionPipelineStateSensor.

此条目有帮助吗?