Azure Data Factory 操作符

Azure Data Factory 是 Azure 的云 ETL 服务,用于横向扩展的无服务器数据集成和数据转换。它提供了一个无代码 UI,用于直观的创作和单窗格的监控和管理。

AzureDataFactoryRunPipelineOperator

使用 AzureDataFactoryRunPipelineOperator 在数据工厂内执行管道。默认情况下,操作符会定期检查已执行管道的状态,以“成功”状态终止。可以通过将 wait_for_termination 设置为 False 来禁用此功能以进行异步等待 — 通常与 AzureDataFactoryPipelineRunStatusSensor 一起使用。

以下是如何使用此操作符执行 Azure Data Factory 管道的示例。

tests/system/microsoft/azure/example_adf_run_pipeline.py[源代码]

    run_pipeline1 = AzureDataFactoryRunPipelineOperator(
        task_id="run_pipeline1",
        pipeline_name="pipeline1",
        parameters={"myParam": "value"},
    )

以下是如何使用此操作符执行带有可延迟标志的 Azure Data Factory 管道的示例,以便在 Airflow Triggerer 上进行管道运行状态的轮询。

tests/system/microsoft/azure/example_adf_run_pipeline.py[源代码]

run_pipeline3 = AzureDataFactoryRunPipelineOperator(
    task_id="run_pipeline3",
    pipeline_name="pipeline1",
    parameters={"myParam": "value"},
    deferrable=True,
)

这是使用此操作符执行管道的另一个示例,但与 AzureDataFactoryPipelineRunStatusSensor 结合使用以执行异步等待。

tests/system/microsoft/azure/example_adf_run_pipeline.py[源代码]

    run_pipeline2 = AzureDataFactoryRunPipelineOperator(
        task_id="run_pipeline2",
        pipeline_name="pipeline2",
        wait_for_termination=False,
    )

    pipeline_run_sensor = AzureDataFactoryPipelineRunStatusSensor(
        task_id="pipeline_run_sensor",
        run_id=cast(str, XComArg(run_pipeline2, key="run_id")),
    )

    # Performs polling on the Airflow Triggerer thus freeing up resources on Airflow Worker
    pipeline_run_sensor_deferred = AzureDataFactoryPipelineRunStatusSensor(
        task_id="pipeline_run_sensor_defered",
        run_id=cast(str, XComArg(run_pipeline2, key="run_id")),
        deferrable=True,
    )

    pipeline_run_async_sensor = AzureDataFactoryPipelineRunStatusSensor(
        task_id="pipeline_run_async_sensor",
        run_id=cast(str, XComArg(run_pipeline2, key="run_id")),
        deferrable=True,
    )

如果您希望在传感器运行时释放工作槽,您也可以在 AzureDataFactoryPipelineRunStatusSensor 中使用可延迟模式。

tests/system/microsoft/azure/example_adf_run_pipeline.py[源代码]

    run_pipeline2 = AzureDataFactoryRunPipelineOperator(
        task_id="run_pipeline2",
        pipeline_name="pipeline2",
        wait_for_termination=False,
    )

    pipeline_run_sensor = AzureDataFactoryPipelineRunStatusSensor(
        task_id="pipeline_run_sensor",
        run_id=cast(str, XComArg(run_pipeline2, key="run_id")),
    )

    # Performs polling on the Airflow Triggerer thus freeing up resources on Airflow Worker
    pipeline_run_sensor_deferred = AzureDataFactoryPipelineRunStatusSensor(
        task_id="pipeline_run_sensor_defered",
        run_id=cast(str, XComArg(run_pipeline2, key="run_id")),
        deferrable=True,
    )

    pipeline_run_async_sensor = AzureDataFactoryPipelineRunStatusSensor(
        task_id="pipeline_run_async_sensor",
        run_id=cast(str, XComArg(run_pipeline2, key="run_id")),
        deferrable=True,
    )

异步轮询数据工厂管道运行状态

使用 AzureDataFactoryPipelineRunStatusAsyncSensor(可延迟版本)异步定期检索数据工厂管道运行的状态。由于作业状态的轮询发生在 Airflow 触发器上,此传感器将释放工作槽,从而有效利用 Airflow 内的资源。

tests/system/microsoft/azure/example_adf_run_pipeline.py[源代码]

run_pipeline2 = AzureDataFactoryRunPipelineOperator(
    task_id="run_pipeline2",
    pipeline_name="pipeline2",
    wait_for_termination=False,
)

pipeline_run_sensor = AzureDataFactoryPipelineRunStatusSensor(
    task_id="pipeline_run_sensor",
    run_id=cast(str, XComArg(run_pipeline2, key="run_id")),
)

# Performs polling on the Airflow Triggerer thus freeing up resources on Airflow Worker
pipeline_run_sensor_deferred = AzureDataFactoryPipelineRunStatusSensor(
    task_id="pipeline_run_sensor_defered",
    run_id=cast(str, XComArg(run_pipeline2, key="run_id")),
    deferrable=True,
)

pipeline_run_async_sensor = AzureDataFactoryPipelineRunStatusSensor(
    task_id="pipeline_run_async_sensor",
    run_id=cast(str, XComArg(run_pipeline2, key="run_id")),
    deferrable=True,
)

参考

更多信息,请参阅 Microsoft 文档

此条目是否有帮助?