Azure Data Factory 操作符¶
Azure Data Factory 是 Azure 的云 ETL 服务,用于横向扩展的无服务器数据集成和数据转换。它提供一个无需代码的 UI,用于直观创作以及单窗格监控和管理。
AzureDataFactoryRunPipelineOperator¶
使用 AzureDataFactoryRunPipelineOperator
在数据工厂中执行管道。默认情况下,操作符会定期检查已执行管道的状态,并在状态为“成功”时终止。通过将 wait_for_termination
设置为 False,可以禁用此功能以实现异步等待——通常与 AzureDataFactoryPipelineRunStatusSensor
配合使用。
下面是使用此操作符执行 Azure Data Factory 管道的示例。
tests/system/microsoft/azure/example_adf_run_pipeline.py
run_pipeline1 = AzureDataFactoryRunPipelineOperator( task_id="run_pipeline1", pipeline_name="pipeline1", parameters={"myParam": "value"}, )
下面是使用此操作符执行 Azure Data Factory 管道并设置 deferrable 标志的示例,以便管道运行状态的轮询发生在 Airflow Triggerer 上。
tests/system/microsoft/azure/example_adf_run_pipeline.py
run_pipeline3 = AzureDataFactoryRunPipelineOperator( task_id="run_pipeline3", pipeline_name="pipeline1", parameters={"myParam": "value"}, deferrable=True, )
这里是使用此操作符执行管道但与 AzureDataFactoryPipelineRunStatusSensor
配合执行异步等待的另一个示例。
tests/system/microsoft/azure/example_adf_run_pipeline.py
run_pipeline2 = AzureDataFactoryRunPipelineOperator( task_id="run_pipeline2", pipeline_name="pipeline2", wait_for_termination=False, ) pipeline_run_sensor = AzureDataFactoryPipelineRunStatusSensor( task_id="pipeline_run_sensor", run_id=cast("str", XComArg(run_pipeline2, key="run_id")), ) # Performs polling on the Airflow Triggerer thus freeing up resources on Airflow Worker pipeline_run_sensor_deferred = AzureDataFactoryPipelineRunStatusSensor( task_id="pipeline_run_sensor_defered", run_id=cast("str", XComArg(run_pipeline2, key="run_id")), deferrable=True, ) pipeline_run_async_sensor = AzureDataFactoryPipelineRunStatusSensor( task_id="pipeline_run_async_sensor", run_id=cast("str", XComArg(run_pipeline2, key="run_id")), deferrable=True, )
此外,如果您想在 sensor 运行时释放工作槽,可以在 AzureDataFactoryPipelineRunStatusSensor
中使用 deferrable 模式。
tests/system/microsoft/azure/example_adf_run_pipeline.py
run_pipeline2 = AzureDataFactoryRunPipelineOperator( task_id="run_pipeline2", pipeline_name="pipeline2", wait_for_termination=False, ) pipeline_run_sensor = AzureDataFactoryPipelineRunStatusSensor( task_id="pipeline_run_sensor", run_id=cast("str", XComArg(run_pipeline2, key="run_id")), ) # Performs polling on the Airflow Triggerer thus freeing up resources on Airflow Worker pipeline_run_sensor_deferred = AzureDataFactoryPipelineRunStatusSensor( task_id="pipeline_run_sensor_defered", run_id=cast("str", XComArg(run_pipeline2, key="run_id")), deferrable=True, ) pipeline_run_async_sensor = AzureDataFactoryPipelineRunStatusSensor( task_id="pipeline_run_async_sensor", run_id=cast("str", XComArg(run_pipeline2, key="run_id")), deferrable=True, )
异步轮询数据工厂管道运行状态¶
使用 AzureDataFactoryPipelineRunStatusAsyncSensor
(deferrable 版本) 异步定期检索数据工厂管道运行状态。此 sensor 会释放工作槽,因为作业状态的轮询发生在 Airflow triggerer 上,从而实现 Airflow 内部资源的有效利用。
tests/system/microsoft/azure/example_adf_run_pipeline.py
run_pipeline2 = AzureDataFactoryRunPipelineOperator(
task_id="run_pipeline2",
pipeline_name="pipeline2",
wait_for_termination=False,
)
pipeline_run_sensor = AzureDataFactoryPipelineRunStatusSensor(
task_id="pipeline_run_sensor",
run_id=cast("str", XComArg(run_pipeline2, key="run_id")),
)
# Performs polling on the Airflow Triggerer thus freeing up resources on Airflow Worker
pipeline_run_sensor_deferred = AzureDataFactoryPipelineRunStatusSensor(
task_id="pipeline_run_sensor_defered",
run_id=cast("str", XComArg(run_pipeline2, key="run_id")),
deferrable=True,
)
pipeline_run_async_sensor = AzureDataFactoryPipelineRunStatusSensor(
task_id="pipeline_run_async_sensor",
run_id=cast("str", XComArg(run_pipeline2, key="run_id")),
deferrable=True,
)
参考¶
欲了解更多信息,请参考 Microsoft 文档