Google DataFusion 操作符¶
Cloud Data Fusion 是一项完全托管的云原生数据集成服务,可帮助用户高效地构建和管理 ETL/ELT 数据管道。Cloud Data Fusion 提供图形界面以及包含预配置连接器和转换的广泛开源库,使组织能够将重心从代码和集成转移到洞察和行动上。
先决条件任务¶
要使用这些操作符,您必须完成以下几件事
使用 Cloud Console 选择或创建 Cloud Platform 项目。
为您的项目启用结算功能,详情请参阅 Google Cloud 文档。
启用 API,详情请参阅 Cloud Console 文档。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'有关详细信息,请参见 安装。
重启 DataFusion 实例¶
要重启 Data Fusion 实例,请使用:CloudDataFusionRestartInstanceOperator
。
tests/system/google/cloud/datafusion/example_datafusion.py
restart_instance = CloudDataFusionRestartInstanceOperator(
location=LOCATION, instance_name=INSTANCE_NAME, task_id="restart_instance"
)
您可以将 Jinja 模板 与 instance_name
, impersonation_chain
参数一起使用,以动态确定值。结果会保存到 XCom 中,其他操作符可以使用该结果。
删除 DataFusion 实例¶
要删除 Data Fusion 实例,请使用:CloudDataFusionDeleteInstanceOperator
。
tests/system/google/cloud/datafusion/example_datafusion.py
delete_instance = CloudDataFusionDeleteInstanceOperator(
location=LOCATION,
instance_name=INSTANCE_NAME,
task_id="delete_instance",
trigger_rule=TriggerRule.ALL_DONE,
)
您可以将 Jinja 模板 与 instance_name
, impersonation_chain
参数一起使用,以动态确定值。结果会保存到 XCom 中,其他操作符可以使用该结果。
创建 DataFusion 实例¶
要创建 Data Fusion 实例,请使用:CloudDataFusionCreateInstanceOperator
。
tests/system/google/cloud/datafusion/example_datafusion.py
create_instance = CloudDataFusionCreateInstanceOperator(
location=LOCATION,
instance_name=INSTANCE_NAME,
instance=INSTANCE,
task_id="create_instance",
)
您可以将 Jinja 模板 与 instance_name
, instance
, impersonation_chain
参数一起使用,以动态确定值。结果会保存到 XCom 中,其他操作符可以使用该结果。
更新 DataFusion 实例¶
要更新 Data Fusion 实例,请使用:CloudDataFusionUpdateInstanceOperator
。
tests/system/google/cloud/datafusion/example_datafusion.py
update_instance = CloudDataFusionUpdateInstanceOperator(
location=LOCATION,
instance_name=INSTANCE_NAME,
instance=INSTANCE,
update_mask="",
task_id="update_instance",
)
您可以将 Jinja 模板 与 instance_name
, instance
, impersonation_chain
参数一起使用,以动态确定值。结果会保存到 XCom 中,其他操作符可以使用该结果。
获取 DataFusion 实例¶
要检索 Data Fusion 实例,请使用:CloudDataFusionGetInstanceOperator
。
tests/system/google/cloud/datafusion/example_datafusion.py
get_instance = CloudDataFusionGetInstanceOperator(
location=LOCATION, instance_name=INSTANCE_NAME, task_id="get_instance"
)
您可以将 Jinja 模板 与 instance_name
, impersonation_chain
参数一起使用,以动态确定值。结果会保存到 XCom 中,其他操作符可以使用该结果。
创建 DataFusion 管道¶
要创建 Data Fusion 管道,请使用:CloudDataFusionCreatePipelineOperator
。
tests/system/google/cloud/datafusion/example_datafusion.py
create_pipeline = CloudDataFusionCreatePipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
pipeline=PIPELINE,
instance_name=INSTANCE_NAME,
task_id="create_pipeline",
)
您可以将 Jinja 模板 与 instance_name
, pipeline_name
, impersonation_chain
参数一起使用,以动态确定值。结果会保存到 XCom 中,其他操作符可以使用该结果。
启动 DataFusion 管道¶
要使用同步模式启动 Data Fusion 管道,请使用:CloudDataFusionStartPipelineOperator
。
tests/system/google/cloud/datafusion/example_datafusion.py
start_pipeline = CloudDataFusionStartPipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
pipeline_timeout=1000,
task_id="start_pipeline",
)
要使用异步模式启动 Data Fusion 管道,请使用:CloudDataFusionStartPipelineOperator
。
tests/system/google/cloud/datafusion/example_datafusion.py
start_pipeline_async = CloudDataFusionStartPipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
asynchronous=True,
task_id="start_pipeline_async",
)
可以使用可延迟模式异步启动 Data Fusion 管道。虽然异步参数可以使用同步的 sleep() 方法等待 DataFusion 管道达到终止状态,但可延迟模式使用异步调用检查状态。不能同时使用异步和可延迟参数。请查看使用可延迟模式的示例:CloudDataFusionStartPipelineOperator
。
tests/system/google/cloud/datafusion/example_datafusion.py
start_pipeline_def = CloudDataFusionStartPipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
task_id="start_pipeline_def",
deferrable=True,
)
您可以将 Jinja 模板 与 instance_name
, pipeline_name
, runtime_args
, impersonation_chain
参数一起使用,以动态确定值。结果会保存到 XCom 中,其他操作符可以使用该结果。
停止 DataFusion 管道¶
要停止 Data Fusion 管道,请使用:CloudDataFusionStopPipelineOperator
。
tests/system/google/cloud/datafusion/example_datafusion.py
stop_pipeline = CloudDataFusionStopPipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
task_id="stop_pipeline",
)
您可以将 Jinja 模板 与 instance_name
, pipeline_name
, impersonation_chain
参数一起使用,以动态确定值。结果会保存到 XCom 中,其他操作符可以使用该结果。
删除 DataFusion 管道¶
要删除 Data Fusion 管道,请使用:CloudDataFusionDeletePipelineOperator
。
tests/system/google/cloud/datafusion/example_datafusion.py
delete_pipeline = CloudDataFusionDeletePipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
task_id="delete_pipeline",
trigger_rule=TriggerRule.ALL_DONE,
)
您可以将 Jinja 模板 与 instance_name
, version_id
, pipeline_name
, impersonation_chain
参数一起使用,以动态确定值。结果会保存到 XCom 中,其他操作符可以使用该结果。
列出 DataFusion 管道¶
要列出 Data Fusion 管道,请使用:CloudDataFusionListPipelinesOperator
。
tests/system/google/cloud/datafusion/example_datafusion.py
list_pipelines = CloudDataFusionListPipelinesOperator(
location=LOCATION, instance_name=INSTANCE_NAME, task_id="list_pipelines"
)
您可以将 Jinja 模板 与 instance_name
, artifact_name
, artifact_version
, impersonation_chain
参数一起使用,以动态确定值。结果会保存到 XCom 中,其他操作符可以使用该结果。
传感器¶
当管道启动被异步触发时,可以使用传感器来运行检查并验证管道是否处于正确状态。
CloudDataFusionPipelineStateSensor
.
tests/system/google/cloud/datafusion/example_datafusion.py
start_pipeline_sensor = CloudDataFusionPipelineStateSensor(
task_id="pipeline_state_sensor",
pipeline_name=PIPELINE_NAME,
pipeline_id=start_pipeline_async.output,
expected_statuses=["COMPLETED"],
failure_statuses=["FAILED"],
instance_name=INSTANCE_NAME,
location=LOCATION,
)