Google DataFusion 操作符

Cloud Data Fusion 是一项完全托管的、云原生的数据集成服务,可帮助用户高效构建和管理 ETL/ELT 数据管道。借助图形界面和广泛的预配置连接器和转换开源库,Cloud Data Fusion 将组织的重心从代码和集成转移到洞察和行动。

先决条件任务

要使用这些操作符,您必须执行以下几项操作

重启 DataFusion 实例

要重启 Data Fusion 实例,请使用:CloudDataFusionRestartInstanceOperator

tests/system/google/cloud/datafusion/example_datafusion.py[源代码]

restart_instance = CloudDataFusionRestartInstanceOperator(
    location=LOCATION, instance_name=INSTANCE_NAME, task_id="restart_instance"
)

您可以使用 Jinja 模板 以及 instance_nameimpersonation_chain 参数,以动态确定值。结果将保存到 XCom 中,以便其他操作符使用。

删除 DataFusion 实例

要删除 Data Fusion 实例,请使用:CloudDataFusionDeleteInstanceOperator

tests/system/google/cloud/datafusion/example_datafusion.py[源代码]

delete_instance = CloudDataFusionDeleteInstanceOperator(
    location=LOCATION,
    instance_name=INSTANCE_NAME,
    task_id="delete_instance",
    trigger_rule=TriggerRule.ALL_DONE,
)

您可以使用 Jinja 模板 以及 instance_nameimpersonation_chain 参数,以动态确定值。结果将保存到 XCom 中,以便其他操作符使用。

创建 DataFusion 实例

要创建 Data Fusion 实例,请使用:CloudDataFusionCreateInstanceOperator

tests/system/google/cloud/datafusion/example_datafusion.py[源代码]

create_instance = CloudDataFusionCreateInstanceOperator(
    location=LOCATION,
    instance_name=INSTANCE_NAME,
    instance=INSTANCE,
    task_id="create_instance",
)

您可以使用 Jinja 模板 以及 instance_nameinstanceimpersonation_chain 参数,以动态确定值。结果将保存到 XCom 中,以便其他操作符使用。

更新 DataFusion 实例

要更新 Data Fusion 实例,请使用:CloudDataFusionUpdateInstanceOperator

tests/system/google/cloud/datafusion/example_datafusion.py[源代码]

update_instance = CloudDataFusionUpdateInstanceOperator(
    location=LOCATION,
    instance_name=INSTANCE_NAME,
    instance=INSTANCE,
    update_mask="",
    task_id="update_instance",
)

您可以使用 Jinja 模板 以及 instance_nameinstanceimpersonation_chain 参数,以动态确定值。结果将保存到 XCom 中,以便其他操作符使用。

获取 DataFusion 实例

要检索 Data Fusion 实例,请使用:CloudDataFusionGetInstanceOperator

tests/system/google/cloud/datafusion/example_datafusion.py[源代码]

get_instance = CloudDataFusionGetInstanceOperator(
    location=LOCATION, instance_name=INSTANCE_NAME, task_id="get_instance"
)

您可以使用 Jinja 模板 以及 instance_nameimpersonation_chain 参数,以动态确定值。结果将保存到 XCom 中,以便其他操作符使用。

创建 DataFusion 管道

要创建 Data Fusion 管道,请使用:CloudDataFusionCreatePipelineOperator

tests/system/google/cloud/datafusion/example_datafusion.py[源代码]

create_pipeline = CloudDataFusionCreatePipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    pipeline=PIPELINE,
    instance_name=INSTANCE_NAME,
    task_id="create_pipeline",
)

您可以使用 Jinja 模板 以及 instance_namepipeline_nameimpersonation_chain 参数,以动态确定值。结果将保存到 XCom 中,以便其他操作符使用。

启动 DataFusion 管道

要使用同步模式启动 Data Fusion 管道,请使用:CloudDataFusionStartPipelineOperator

tests/system/google/cloud/datafusion/example_datafusion.py[源代码]

start_pipeline = CloudDataFusionStartPipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    instance_name=INSTANCE_NAME,
    pipeline_timeout=1000,
    task_id="start_pipeline",
)

要使用异步模式启动 Data Fusion 管道,请使用:CloudDataFusionStartPipelineOperator

tests/system/google/cloud/datafusion/example_datafusion.py[源代码]

start_pipeline_async = CloudDataFusionStartPipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    instance_name=INSTANCE_NAME,
    asynchronous=True,
    task_id="start_pipeline_async",
)

可以使用可延迟模式异步启动 Data Fusion 管道。虽然异步参数可以使用同步 sleep() 方法等待 DataFusion 管道达到终止状态,但可延迟模式使用异步调用检查状态。不能同时使用异步和可延迟参数。请查看使用可延迟模式的示例:CloudDataFusionStartPipelineOperator

tests/system/google/cloud/datafusion/example_datafusion.py[源代码]

start_pipeline_def = CloudDataFusionStartPipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    instance_name=INSTANCE_NAME,
    task_id="start_pipeline_def",
    deferrable=True,
)

您可以使用 Jinja 模板 以及 instance_namepipeline_nameruntime_argsimpersonation_chain 参数,以动态确定值。结果将保存到 XCom 中,以便其他操作符使用。

停止 DataFusion 管道

要停止 Data Fusion 管道,请使用:CloudDataFusionStopPipelineOperator

tests/system/google/cloud/datafusion/example_datafusion.py[源代码]

stop_pipeline = CloudDataFusionStopPipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    instance_name=INSTANCE_NAME,
    task_id="stop_pipeline",
)

您可以使用 Jinja 模板 以及 instance_namepipeline_nameimpersonation_chain 参数,以动态确定值。结果将保存到 XCom 中,以便其他操作符使用。

删除 DataFusion 管道

要删除 Data Fusion 管道,请使用:CloudDataFusionDeletePipelineOperator

tests/system/google/cloud/datafusion/example_datafusion.py[源代码]

delete_pipeline = CloudDataFusionDeletePipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    instance_name=INSTANCE_NAME,
    task_id="delete_pipeline",
    trigger_rule=TriggerRule.ALL_DONE,
)

您可以使用 Jinja 模板 以及 instance_nameversion_idpipeline_nameimpersonation_chain 参数,以动态确定值。结果将保存到 XCom 中,以便其他操作符使用。

列出 DataFusion 管道

要列出 Data Fusion 管道,请使用:CloudDataFusionListPipelinesOperator

tests/system/google/cloud/datafusion/example_datafusion.py[源代码]

list_pipelines = CloudDataFusionListPipelinesOperator(
    location=LOCATION, instance_name=INSTANCE_NAME, task_id="list_pipelines"
)

您可以使用 Jinja 模板 以及 instance_nameartifact_nameartifact_versionimpersonation_chain 参数,以动态确定值。结果将保存到 XCom 中,以便其他操作符使用。

传感器

当异步触发管道启动时,可以使用传感器运行检查并验证管道是否处于正确状态。

CloudDataFusionPipelineStateSensor.

tests/system/google/cloud/datafusion/example_datafusion.py[源代码]

start_pipeline_sensor = CloudDataFusionPipelineStateSensor(
    task_id="pipeline_state_sensor",
    pipeline_name=PIPELINE_NAME,
    pipeline_id=start_pipeline_async.output,
    expected_statuses=["COMPLETED"],
    failure_statuses=["FAILED"],
    instance_name=INSTANCE_NAME,
    location=LOCATION,
)

CloudDataFusionPipelineStateSensor.

此条目是否有帮助?