airflow.providers.google.cloud.hooks.datafusion

此模块包含 Google DataFusion Hook。

属性

Operation

FAILURE_STATES

SUCCESS_STATES

异常

ConflictException

捕获 409 错误的异常。

PipelineStates

Data Fusion 流水线状态。

DataFusionHook

Google DataFusion 的 Hook。

DataFusionAsyncHook

获取 DataFusion 异步 Hook 的类。

模块内容

airflow.providers.google.cloud.hooks.datafusion.Operation[源代码]
exception airflow.providers.google.cloud.hooks.datafusion.ConflictException[源代码]

基类: airflow.exceptions.AirflowException

捕获 409 错误的异常。

class airflow.providers.google.cloud.hooks.datafusion.PipelineStates[源代码]

Data Fusion 流水线状态。

PENDING = 'PENDING'[源代码]
STARTING = 'STARTING'[源代码]
RUNNING = 'RUNNING'[源代码]
SUSPENDED = 'SUSPENDED'[源代码]
RESUMING = 'RESUMING'[源代码]
COMPLETED = 'COMPLETED'[源代码]
FAILED = 'FAILED'[源代码]
KILLED = 'KILLED'[源代码]
REJECTED = 'REJECTED'[源代码]
airflow.providers.google.cloud.hooks.datafusion.FAILURE_STATES[源代码]
airflow.providers.google.cloud.hooks.datafusion.SUCCESS_STATES[源代码]
class airflow.providers.google.cloud.hooks.datafusion.DataFusionHook(api_version='v1beta1', gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[源代码]

基类: airflow.providers.google.common.hooks.base_google.GoogleBaseHook

Google DataFusion 的 Hook。

api_version = 'v1beta1'[源代码]
wait_for_operation(operation)[源代码]

等待长时间运行的操作完成。

wait_for_pipeline_state(pipeline_name, pipeline_id, instance_url, pipeline_type=DataFusionPipelineType.BATCH, namespace='default', success_states=None, failure_states=None, timeout=5 * 60)[源代码]

轮询流水线状态,如果状态失败或超时则抛出异常。

get_conn()[源代码]

获取 DataFusion 连接。

restart_instance(instance_name, location, project_id)[源代码]

重启单个 Data Fusion 实例。

操作完成后,实例将完全重启。

参数:
  • instance_name (str) – 要重启的实例名称。

  • location (str) – 处理请求的 Cloud Data Fusion 位置。

  • project_id (str) – 实例所属的 Google Cloud 项目 ID。

delete_instance(instance_name, location, project_id)[源代码]

删除单个 Data Fusion 实例。

参数:
  • instance_name (str) – 要删除的实例名称。

  • location (str) – 处理请求的 Cloud Data Fusion 位置。

  • project_id (str) – 实例所属的 Google Cloud 项目 ID。

create_instance(instance_name, instance, location, project_id=PROVIDE_PROJECT_ID)[源代码]

在指定的项目和位置创建新的 Data Fusion 实例。

参数:
get_instance(instance_name, location, project_id)[源代码]

获取单个 Data Fusion 实例的详情。

参数:
  • instance_name (str) – 实例名称。

  • location (str) – 处理请求的 Cloud Data Fusion 位置。

  • project_id (str) – 实例所属的 Google Cloud 项目 ID。

get_instance_artifacts(instance_url, namespace='default', scope='SYSTEM')[源代码]
patch_instance(instance_name, instance, update_mask, location, project_id=PROVIDE_PROJECT_ID)[源代码]

更新单个 Data Fusion 实例。

参数:
create_pipeline(pipeline_name, pipeline, instance_url, namespace='default')[源代码]

创建批量 Cloud Data Fusion 流水线。

参数:
delete_pipeline(pipeline_name, instance_url, version_id=None, namespace='default')[源代码]

删除批量 Cloud Data Fusion 流水线。

参数:
  • pipeline_name (str) – 您的流水线名称。

  • version_id (str | None) – 要删除的流水线版本

  • instance_url (str) – 实例上可访问 REST API 的端点。

  • namespace (str) – 如果您的流水线属于基础版实例,则命名空间 ID 始终为 default。如果您的流水线属于企业版实例,则可以创建命名空间。

list_pipelines(instance_url, artifact_name=None, artifact_version=None, namespace='default')[源代码]

列出 Cloud Data Fusion 流水线。

参数:
  • artifact_version (str | None) – 用于过滤实例的工件版本

  • artifact_name (str | None) – 用于过滤实例的工件名称

  • instance_url (str) – 实例上可访问 REST API 的端点。

  • namespace (str) – 如果您的流水线属于基础版实例,则命名空间 ID 始终为 default。如果您的流水线属于企业版实例,则可以创建命名空间。

get_pipeline_workflow(pipeline_name, instance_url, pipeline_id, pipeline_type=DataFusionPipelineType.BATCH, namespace='default')[源代码]
start_pipeline(pipeline_name, instance_url, pipeline_type=DataFusionPipelineType.BATCH, namespace='default', runtime_args=None)[源代码]

启动 Cloud Data Fusion 流水线。适用于批量和流式流水线。

参数:
  • pipeline_name (str) – 您的流水线名称。

  • pipeline_type (airflow.providers.google.cloud.utils.datafusion.DataFusionPipelineType) – 可选的流水线类型(默认为 BATCH)。

  • instance_url (str) – 实例上可访问 REST API 的端点。

  • runtime_args (dict[str, Any] | None) – 要传递给流水线的可选运行时 JSON 参数

  • namespace (str) – 如果您的流水线属于基础版实例,则命名空间 ID 始终为 default。如果您的流水线属于企业版实例,则可以创建命名空间。

stop_pipeline(pipeline_name, instance_url, namespace='default')[源代码]

停止 Cloud Data Fusion 流水线。适用于批量和流式流水线。

参数:
  • pipeline_name (str) – 您的流水线名称。

  • instance_url (str) – 实例上可访问 REST API 的端点。

  • namespace (str) – 如果您的流水线属于基础版实例,则命名空间 ID 始终为 default。如果您的流水线属于企业版实例,则可以创建命名空间。

static cdap_program_type(pipeline_type)[源代码]

根据流水线类型检索 CDAP 程序类型。

参数:

pipeline_type (airflow.providers.google.cloud.utils.datafusion.DataFusionPipelineType) – 流水线类型。

static cdap_program_id(pipeline_type)[源代码]

根据流水线类型检索 CDAP 程序 ID。

参数:

pipeline_type (airflow.providers.google.cloud.utils.datafusion.DataFusionPipelineType) – 流水线类型。

class airflow.providers.google.cloud.hooks.datafusion.DataFusionAsyncHook(**kwargs)[源代码]

基类: airflow.providers.google.common.hooks.base_google.GoogleBaseAsyncHook

获取 DataFusion 异步 Hook 的类。

sync_hook_class[源代码]
scopes = ['https://www.googleapis.com/auth/cloud-platform'][源代码]
async get_pipeline(instance_url, namespace, pipeline_name, pipeline_id, session, pipeline_type=DataFusionPipelineType.BATCH)[源代码]
async get_pipeline_status(pipeline_name, instance_url, pipeline_id, pipeline_type=DataFusionPipelineType.BATCH, namespace='default', success_states=None)[源代码]

异步获取 Cloud Data Fusion 流水线状态。

参数:
  • pipeline_name (str) – 您的流水线名称。

  • instance_url (str) – 实例上可访问 REST API 的端点。

  • pipeline_id (str) – 与特定流水线关联的唯一流水线 ID。

  • pipeline_type (airflow.providers.google.cloud.utils.datafusion.DataFusionPipelineType) – 可选的流水线类型(默认为 batch)。

  • namespace (str) – 如果您的流水线属于基础版实例,则命名空间 ID 始终为 default。如果您的流水线属于企业版实例,则可以创建命名空间。

  • success_states (list[str] | None) – 如果提供,operator 将等待流水线进入提供的状态之一。

本条目是否有帮助?