airflow.providers.google.cloud.hooks.dataflow

此模块包含一个 Google Dataflow Hook。

属性

DEFAULT_DATAFLOW_LOCATION

JOB_ID_PATTERN

T

DataflowJobStatus

Dataflow 作业状态的帮助类。

DataflowJobType

Dataflow 作业类型的帮助类。

DataflowHook

用于 Google Dataflow 的 Hook。

AsyncDataflowHook

Dataflow 服务的异步 hook 类。

函数

process_line_and_extract_dataflow_job_id_callback(...)

构建触发指定函数的回调。

模块内容

airflow.providers.google.cloud.hooks.dataflow.DEFAULT_DATAFLOW_LOCATION = 'us-central1'[源代码]
airflow.providers.google.cloud.hooks.dataflow.JOB_ID_PATTERN[源代码]
airflow.providers.google.cloud.hooks.dataflow.T[源代码]
airflow.providers.google.cloud.hooks.dataflow.process_line_and_extract_dataflow_job_id_callback(on_new_job_id_callback)[源代码]

构建触发指定函数的回调。

返回的回调函数旨在用作 BeamCommandRunner 中的 process_line_callback

参数:

on_new_job_id_callback (Callable[[str], None] | None) – 当作业 ID 已知时调用的回调函数

class airflow.providers.google.cloud.hooks.dataflow.DataflowJobStatus[源代码]

Dataflow 作业状态的帮助类。

参考: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState

JOB_STATE_DONE = 'JOB_STATE_DONE'[源代码]
JOB_STATE_UNKNOWN = 'JOB_STATE_UNKNOWN'[源代码]
JOB_STATE_STOPPED = 'JOB_STATE_STOPPED'[源代码]
JOB_STATE_RUNNING = 'JOB_STATE_RUNNING'[源代码]
JOB_STATE_FAILED = 'JOB_STATE_FAILED'[源代码]
JOB_STATE_CANCELLED = 'JOB_STATE_CANCELLED'[源代码]
JOB_STATE_UPDATED = 'JOB_STATE_UPDATED'[源代码]
JOB_STATE_DRAINING = 'JOB_STATE_DRAINING'[源代码]
JOB_STATE_DRAINED = 'JOB_STATE_DRAINED'[源代码]
JOB_STATE_PENDING = 'JOB_STATE_PENDING'[源代码]
JOB_STATE_CANCELLING = 'JOB_STATE_CANCELLING'[源代码]
JOB_STATE_QUEUED = 'JOB_STATE_QUEUED'[源代码]
FAILED_END_STATES[源代码]
SUCCEEDED_END_STATES[源代码]
TERMINAL_STATES[源代码]
AWAITING_STATES[源代码]
class airflow.providers.google.cloud.hooks.dataflow.DataflowJobType[源代码]

Dataflow 作业类型的帮助类。

JOB_TYPE_UNKNOWN = 'JOB_TYPE_UNKNOWN'[源代码]
JOB_TYPE_BATCH = 'JOB_TYPE_BATCH'[源代码]
JOB_TYPE_STREAMING = 'JOB_TYPE_STREAMING'[源代码]
class airflow.providers.google.cloud.hooks.dataflow.DataflowHook(gcp_conn_id='google_cloud_default', poll_sleep=10, impersonation_chain=None, drain_pipeline=False, cancel_timeout=5 * 60, wait_until_finished=None, expected_terminal_state=None, **kwargs)[源代码]

基类: airflow.providers.google.common.hooks.base_google.GoogleBaseHook

用于 Google Dataflow 的 Hook。

此 Hook 中所有使用 project_id 的方法必须使用关键字参数而非位置参数调用。

poll_sleep = 10[源代码]
drain_pipeline = False[源代码]
cancel_timeout = 300[源代码]
wait_until_finished = None[源代码]
job_id: str | None = None[源代码]
beam_hook[源代码]
expected_terminal_state = None[源代码]
get_conn()[源代码]

返回一个 Google Cloud Dataflow 服务对象。

get_pipelines_conn()[源代码]

返回一个 Google Cloud Data Pipelines 服务对象。

start_template_dataflow(job_name, variables, parameters, dataflow_template, project_id, append_job_name=True, on_new_job_id_callback=None, on_new_job_callback=None, location=DEFAULT_DATAFLOW_LOCATION, environment=None)[源代码]

使用经典模板启动一个 Dataflow 作业并等待其完成。

参数:
  • job_name (str) – 作业的名称。

  • variables (dict) –

    作业运行时环境选项的映射。如果传递了 environment 参数,它将更新。

    参见

    有关可能配置的更多信息,请查阅 API 文档 https://cloud.google.com/dataflow/pipelines/specifying-exec-params

  • parameters (dict) – 模板的参数

  • dataflow_template (str) – 模板的 GCS 路径。

  • project_id (str) – 可选,用于启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。

  • append_job_name (bool) – 如果需要在作业名称后附加唯一后缀,则为 True。

  • on_new_job_id_callback (Callable[[str], None] | None) – (已弃用)当作业已知时调用的回调函数。

  • on_new_job_callback (Callable[[dict], None] | None) – 当作业已知时调用的回调函数。

  • location (str) –

    作业位置。

    参见

    有关可能配置的更多信息,请查阅 API 文档 https://cloud.google.com/dataflow/pipelines/specifying-exec-params

launch_job_with_template(*, job_name, variables, parameters, dataflow_template, project_id, append_job_name=True, location=DEFAULT_DATAFLOW_LOCATION, environment=None)[源代码]

使用经典模板启动一个 Dataflow 作业,并退出而不等待其完成。

参数:
  • job_name (str) – 作业的名称。

  • variables (dict) –

    作业运行时环境选项的映射。如果传递了 environment 参数,它将更新。

    参见

    有关可能配置的更多信息,请查阅 API 文档 https://cloud.google.com/dataflow/pipelines/specifying-exec-params

  • parameters (dict) – 模板的参数

  • dataflow_template (str) – 模板的 GCS 路径。

  • project_id (str) – 可选,用于启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。

  • append_job_name (bool) – 如果需要在作业名称后附加唯一后缀,则为 True。

  • location (str) –

    作业位置。

    参见

    有关可能配置的更多信息,请查阅 API 文档 https://cloud.google.com/dataflow/pipelines/specifying-exec-params

返回:

Dataflow 作业响应

返回类型:

dict[str, str]

send_launch_template_request(*, project_id, location, gcs_path, job_name, parameters, environment)[源代码]
start_flex_template(body, location, project_id, on_new_job_id_callback=None, on_new_job_callback=None)[源代码]

使用 Flex 模板启动一个 Dataflow 作业并等待其完成。

参数:
返回:

作业

返回类型:

dict[str, str]

launch_job_with_flex_template(body, location, project_id)[源代码]

使用 Flex 模板启动一个 Dataflow 作业,并退出而不等待作业完成。

参数:
返回:

Dataflow 作业响应

返回类型:

dict[str, str]

launch_beam_yaml_job(*, job_name, yaml_pipeline_file, append_job_name, jinja_variables, options, project_id, location=DEFAULT_DATAFLOW_LOCATION)[源代码]

启动一个 Dataflow YAML 作业并运行直到完成。

参数:
  • job_name (str) – 分配给 Cloud Dataflow 作业的唯一名称。

  • yaml_pipeline_file (str) – 定义要运行的 YAML pipeline 的文件路径。必须是本地文件或以 ‘gs://’ 开头的 URL。

  • append_job_name (bool) – 如果必须在 job_name 后附加唯一后缀,则设置为 True。

  • jinja_variables (dict[str, str] | None) – 用于具体化 yaml pipeline 文件的 Jinja2 变量字典。

  • options (dict[str, Any] | None) – 额外的 gcloud 或 Beam 作业参数。它必须是一个字典,其键与 gcloud 中的可选标志名称匹配。支持的标志列表可以在以下链接找到:https://cloud.google.com/sdk/gcloud/reference/dataflow/yaml/run。请注意,如果某个标志不需要值,则其字典值必须为 True 或 None。例如,–log-http 标志可以作为 {‘log-http’: True} 传递。

  • project_id (str) – 拥有该作业的 GCP 项目 ID。

  • location (str) – 作业区域端点的区域 ID。默认为 ‘us-central1’。

  • on_new_job_callback – 一旦知道作业后,将作业传递给 operator 的回调函数。

返回:

作业 ID。

返回类型:

str

static extract_job_id(job)[源代码]
static build_dataflow_job_name(job_name, append_job_name=True)[源代码]

构建 Dataflow 作业名称。

is_job_dataflow_running(name, project_id, location=None, variables=None)[源代码]

检查 Dataflow 作业是否仍在运行。

参数:
  • name (str) – 作业的名称。

  • project_id (str) – 可选,用于启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。

  • location (str | None) – 作业位置。

返回:

如果作业正在运行,则为 True。

返回类型:

bool

cancel_job(project_id, job_name=None, job_id=None, location=DEFAULT_DATAFLOW_LOCATION)[源代码]

使用指定的名称前缀或作业 ID 取消作业。

namejob_id 参数是互斥的。

参数:
  • job_name (str | None) – 指定要取消哪些作业的名称前缀。

  • job_id (str | None) – 指定要取消哪些作业的作业 ID。

  • location (str) – 作业位置。

  • project_id (str) – 可选,用于启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。

start_sql_job(job_name, query, options, project_id, location=DEFAULT_DATAFLOW_LOCATION, on_new_job_id_callback=None, on_new_job_callback=None)[source]

启动 Dataflow SQL 查询。

参数:
  • job_name (str) – 分配给 Cloud Dataflow 作业的唯一名称。

  • query (str) – 要执行的 SQL 查询。

  • options (dict[str, Any]) – 要执行的作业参数。更多信息,请参考:https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query 命令参考

  • location (str) – Dataflow 作业的位置(例如 europe-west1)

  • project_id (str) – 拥有该作业的 GCP 项目 ID。如果设置为 None 或缺失,则使用 GCP 连接中的默认 project_id。

  • on_new_job_id_callback (Callable[[str], None] | None) – (已弃用)当作业 ID 已知时调用的回调函数。

  • on_new_job_callback (Callable[[dict], None] | None) – 当作业已知时调用的回调函数。

返回:

新的作业对象

get_job(job_id, project_id=PROVIDE_PROJECT_ID, location=DEFAULT_DATAFLOW_LOCATION)[source]

获取指定作业 ID 的作业。

参数:
返回:

作业

返回类型:

dict

fetch_job_metrics_by_id(job_id, project_id, location=DEFAULT_DATAFLOW_LOCATION)[source]

获取指定作业 ID 的作业指标。

参数:
返回:

JobMetrics。请参考:https://cloud.google.com/dataflow/docs/reference/rest/v1b3/JobMetrics

返回类型:

dict

fetch_job_messages_by_id(job_id, project_id, location=DEFAULT_DATAFLOW_LOCATION)[source]

获取指定作业 ID 的作业消息。

参数:
  • job_id (str) – 要获取的作业 ID。

  • project_id (str) – 可选,用于启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。

  • location (str) – 作业位置。

返回:

JobMessages 列表。请参考:https://cloud.google.com/dataflow/docs/reference/rest/v1b3/ListJobMessagesResponse#JobMessage

返回类型:

list[dict]

fetch_job_autoscaling_events_by_id(job_id, project_id, location=DEFAULT_DATAFLOW_LOCATION)[source]

获取指定作业 ID 的作业自动扩缩事件。

参数:
  • job_id (str) – 要获取的作业 ID。

  • project_id (str) – 可选,用于启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。

  • location (str) – 作业位置。

返回:

AutoscalingEvents 列表。请参考:https://cloud.google.com/dataflow/docs/reference/rest/v1b3/ListJobMessagesResponse#autoscalingevent

返回类型:

list[dict]

wait_for_done(job_name, location, project_id, job_id=None, multiple_jobs=False)[source]

等待 Dataflow 作业完成。

参数:
  • job_name (str) – 执行 DataFlow 作业时使用的“jobName”(模板化)。这将最终在管道选项中设置,因此 options 中任何键为 'jobName' 的条目都将被覆盖。

  • location (str) – 作业运行的位置

  • project_id (str) – 可选,用于启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。

  • job_id (str | None) – Dataflow 作业 ID

  • multiple_jobs (bool) – 如果管道创建多个作业,则监控所有作业

is_job_done(location, project_id, job_id)[source]

检查 Dataflow 作业是否已启动(对于流处理作业)或已完成(对于批处理作业)。

参数:
  • location (str) – 作业运行的位置

  • project_id (str) – 要在其中启动作业的 Google Cloud 项目 ID

  • job_id (str) – Dataflow 作业 ID

create_data_pipeline(body, project_id, location=DEFAULT_DATAFLOW_LOCATION)[source]

创建一个新的 Dataflow Data Pipelines 实例。

参数:

以 JSON 格式返回创建的 Data Pipelines 实例。

get_data_pipeline(pipeline_name, project_id, location=DEFAULT_DATAFLOW_LOCATION)[source]

检索 Dataflow Data Pipelines 实例。

参数:
  • pipeline_name (str) – 管道的显示名称。例如 projects/PROJECT_ID/locations/LOCATION_ID/pipelines/PIPELINE_ID 中,它就是 PIPELINE_ID。

  • project_id (str) – 拥有该作业的 GCP 项目 ID。

  • location (str) – Data Pipelines 实例的目标位置(例如 us-central1)。

以 JSON 格式返回创建的 Data Pipelines 实例。

run_data_pipeline(pipeline_name, project_id, location=DEFAULT_DATAFLOW_LOCATION)[source]

运行 Dataflow Data Pipeline 实例。

参数:
  • pipeline_name (str) – 管道的显示名称。例如 projects/PROJECT_ID/locations/LOCATION_ID/pipelines/PIPELINE_ID 中,它就是 PIPELINE_ID。

  • project_id (str) – 拥有该作业的 GCP 项目 ID。

  • location (str) – Data Pipelines 实例的目标位置(例如 us-central1)。

以 JSON 格式返回创建的作业。

delete_data_pipeline(pipeline_name, project_id, location=DEFAULT_DATAFLOW_LOCATION)[source]

删除 Dataflow Data Pipelines 实例。

参数:
  • pipeline_name (str) – 管道的显示名称。例如 projects/PROJECT_ID/locations/LOCATION_ID/pipelines/PIPELINE_ID 中,它就是 PIPELINE_ID。

  • project_id (str) – 拥有该作业的 GCP 项目 ID。

  • location (str) – Data Pipelines 实例的目标位置(例如 us-central1)。

以 JSON 格式返回创建的作业。

static build_parent_name(project_id, location)[source]
class airflow.providers.google.cloud.hooks.dataflow.AsyncDataflowHook(**kwargs)[source]

基类:airflow.providers.google.common.hooks.base_google.GoogleBaseAsyncHook

Dataflow 服务的异步 hook 类。

sync_hook_class[source]
async initialize_client(client_class)[source]

初始化给定类的对象。

此方法用于初始化异步客户端。由于 Dataflow 服务使用了大量类,因此决定以相同的方式初始化它们,并使用从 GoogleBaseHook 类方法接收到的凭据。 :param client_class: Google Cloud SDK 的类

async get_project_id()[source]
async get_job(job_id, project_id=PROVIDE_PROJECT_ID, job_view=JobView.JOB_VIEW_SUMMARY, location=DEFAULT_DATAFLOW_LOCATION)[source]

获取指定作业 ID 的作业。

参数:
  • job_id (str) – 要获取的作业 ID。

  • project_id (str) – 要在其中启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。

  • job_view (int) – 可选。JobView 对象,用于确定返回数据的表示形式

  • location (str) – 可选。Dataflow 作业的位置(例如 europe-west1)。请参考:https://cloud.google.com/dataflow/docs/concepts/regional-endpoints

async get_job_status(job_id, project_id=PROVIDE_PROJECT_ID, job_view=JobView.JOB_VIEW_SUMMARY, location=DEFAULT_DATAFLOW_LOCATION)[source]

获取指定作业 ID 的作业状态。

参数:
  • job_id (str) – 要获取的作业 ID。

  • project_id (str) – 要在其中启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。

  • job_view (int) – 可选。JobView 对象,用于确定返回数据的表示形式

  • location (str) – 可选。Dataflow 作业的位置(例如 europe-west1)。请参考:https://cloud.google.com/dataflow/docs/concepts/regional-endpoints

async list_jobs(jobs_filter=None, project_id=PROVIDE_PROJECT_ID, location=DEFAULT_DATAFLOW_LOCATION, page_size=None, page_token=None)[source]

列出作业。

详情请参考:https://cloud.google.com/python/docs/reference/dataflow/latest/google.cloud.dataflow_v1beta3.types.ListJobsRequest

参数:
  • jobs_filter (int | None) – 可选。此字段过滤并返回指定作业状态的作业。

  • project_id (str | None) – 可选。要启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。

  • location (str | None) – 可选。Dataflow 作业的位置(例如 europe-west1)。

  • page_size (int | None) – 可选。如果有很多作业,则最多将响应限制为此数量。

  • page_token (str | None) – 可选。将其设置为先前响应的“next_page_token”字段,以请求长列表中的其他结果。

async list_job_messages(job_id, project_id=PROVIDE_PROJECT_ID, minimum_importance=JobMessageImportance.JOB_MESSAGE_BASIC, page_size=None, page_token=None, start_time=None, end_time=None, location=DEFAULT_DATAFLOW_LOCATION)[source]

从 MessagesV1Beta3AsyncClient 返回 ListJobMessagesAsyncPager 对象。

此方法封装了 MessagesV1Beta3AsyncClient 的一个类似方法。ListJobMessagesAsyncPager 可以迭代以提取与特定作业 ID 关联的消息。

有关更多详细信息,请参阅 MessagesV1Beta3AsyncClient 方法说明:https://cloud.google.com/python/docs/reference/dataflow/latest/google.cloud.dataflow_v1beta3.services.messages_v1_beta3.MessagesV1Beta3AsyncClient

参数:
  • job_id (str) – 要获取消息的 Dataflow 作业 ID。

  • project_id (str | None) – 可选。要启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。

  • minimum_importance (int) – 可选。过滤只获取重要性 >= level 的消息。有关更多详细信息,请参阅说明:https://cloud.google.com/python/docs/reference/dataflow/latest/google.cloud.dataflow_v1beta3.types.JobMessageImportance

  • page_size (int | None) – 可选。如果指定,则确定要返回的最大消息数。如果未指定,服务可能会选择适当的默认值,或返回任意大量结果。

  • page_token (str | None) – 可选。如果提供,这应该是先前调用返回的 next_page_token 的值。这将导致返回下一页结果。

  • start_time (google.protobuf.timestamp_pb2.Timestamp | None) – 可选。如果指定,则仅返回时间戳 >= start_time 的消息。默认值为作业创建时间(即消息的开始)。

  • end_time (google.protobuf.timestamp_pb2.Timestamp | None) – 可选。如果指定,则仅返回时间戳 < end_time 的消息。默认值为当前时间。

  • location (str | None) – 可选。包含由 job_id 指定的作业的[区域端点](https://cloud.google.com/dataflow/docs/concepts/regional-endpoints)。

async get_job_metrics(job_id, project_id=PROVIDE_PROJECT_ID, start_time=None, location=DEFAULT_DATAFLOW_LOCATION)[source]

从 MetricsV1Beta3AsyncClient 返回 JobMetrics 对象。

此方法封装了 MetricsV1Beta3AsyncClient 的一个类似方法。

有关更多详细信息,请参阅 MetricsV1Beta3AsyncClient 方法说明:https://cloud.google.com/python/docs/reference/dataflow/latest/google.cloud.dataflow_v1beta3.services.metrics_v1_beta3.MetricsV1Beta3AsyncClient

参数:
  • job_id (str) – 要获取指标的 Dataflow 作业 ID。

  • project_id (str | None) – 可选。要启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。

  • start_time (google.protobuf.timestamp_pb2.Timestamp | None) – 可选。仅返回自此时间以来发生变化的指标数据。默认是返回关于作业所有指标的所有信息。

  • location (str | None) – 可选。包含由 job_id 指定的作业的[区域端点](https://cloud.google.com/dataflow/docs/concepts/regional-endpoints)。

此条目有帮助吗?