airflow.providers.google.cloud.hooks.dataflow¶
此模块包含一个 Google Dataflow Hook。
属性¶
类¶
Dataflow 作业状态的帮助类。 |
|
Dataflow 作业类型的帮助类。 |
|
用于 Google Dataflow 的 Hook。 |
|
Dataflow 服务的异步 hook 类。 |
函数¶
构建触发指定函数的回调。 |
模块内容¶
- airflow.providers.google.cloud.hooks.dataflow.process_line_and_extract_dataflow_job_id_callback(on_new_job_id_callback)[源代码]¶
构建触发指定函数的回调。
返回的回调函数旨在用作
BeamCommandRunner
中的process_line_callback
。- 参数:
on_new_job_id_callback (Callable[[str], None] | None) – 当作业 ID 已知时调用的回调函数
- class airflow.providers.google.cloud.hooks.dataflow.DataflowJobStatus[源代码]¶
Dataflow 作业状态的帮助类。
参考: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState
- class airflow.providers.google.cloud.hooks.dataflow.DataflowHook(gcp_conn_id='google_cloud_default', poll_sleep=10, impersonation_chain=None, drain_pipeline=False, cancel_timeout=5 * 60, wait_until_finished=None, expected_terminal_state=None, **kwargs)[源代码]¶
基类:
airflow.providers.google.common.hooks.base_google.GoogleBaseHook
用于 Google Dataflow 的 Hook。
此 Hook 中所有使用 project_id 的方法必须使用关键字参数而非位置参数调用。
- start_template_dataflow(job_name, variables, parameters, dataflow_template, project_id, append_job_name=True, on_new_job_id_callback=None, on_new_job_callback=None, location=DEFAULT_DATAFLOW_LOCATION, environment=None)[源代码]¶
使用经典模板启动一个 Dataflow 作业并等待其完成。
- 参数:
job_name (str) – 作业的名称。
variables (dict) –
作业运行时环境选项的映射。如果传递了 environment 参数,它将更新。
参见
有关可能配置的更多信息,请查阅 API 文档 https://cloud.google.com/dataflow/pipelines/specifying-exec-params
parameters (dict) – 模板的参数
dataflow_template (str) – 模板的 GCS 路径。
project_id (str) – 可选,用于启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。
append_job_name (bool) – 如果需要在作业名称后附加唯一后缀,则为 True。
on_new_job_id_callback (Callable[[str], None] | None) – (已弃用)当作业已知时调用的回调函数。
on_new_job_callback (Callable[[dict], None] | None) – 当作业已知时调用的回调函数。
location (str) –
作业位置。
参见
有关可能配置的更多信息,请查阅 API 文档 https://cloud.google.com/dataflow/pipelines/specifying-exec-params
- launch_job_with_template(*, job_name, variables, parameters, dataflow_template, project_id, append_job_name=True, location=DEFAULT_DATAFLOW_LOCATION, environment=None)[源代码]¶
使用经典模板启动一个 Dataflow 作业,并退出而不等待其完成。
- 参数:
job_name (str) – 作业的名称。
variables (dict) –
作业运行时环境选项的映射。如果传递了 environment 参数,它将更新。
参见
有关可能配置的更多信息,请查阅 API 文档 https://cloud.google.com/dataflow/pipelines/specifying-exec-params
parameters (dict) – 模板的参数
dataflow_template (str) – 模板的 GCS 路径。
project_id (str) – 可选,用于启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。
append_job_name (bool) – 如果需要在作业名称后附加唯一后缀,则为 True。
location (str) –
作业位置。
参见
有关可能配置的更多信息,请查阅 API 文档 https://cloud.google.com/dataflow/pipelines/specifying-exec-params
- 返回:
Dataflow 作业响应
- 返回类型:
- send_launch_template_request(*, project_id, location, gcs_path, job_name, parameters, environment)[源代码]¶
- start_flex_template(body, location, project_id, on_new_job_id_callback=None, on_new_job_callback=None)[源代码]¶
使用 Flex 模板启动一个 Dataflow 作业并等待其完成。
- 参数:
body (dict) – 请求体。参见: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.locations.flexTemplates/launch#request-body
location (str) – Dataflow 作业的位置(例如 europe-west1)
project_id (str) – 拥有该作业的 GCP 项目 ID。如果设置为
None
或缺失,则使用 GCP 连接中的默认 project_id。on_new_job_id_callback (Callable[[str], None] | None) – (已弃用)检测到作业 ID 时调用的回调函数。
on_new_job_callback (Callable[[dict], None] | None) – 检测到作业时调用的回调函数。
- 返回:
作业
- 返回类型:
- launch_job_with_flex_template(body, location, project_id)[源代码]¶
使用 Flex 模板启动一个 Dataflow 作业,并退出而不等待作业完成。
- 参数:
body (dict) – 请求体。参见: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.locations.flexTemplates/launch#request-body
location (str) – Dataflow 作业的位置(例如 europe-west1)
project_id (str) – 拥有该作业的 GCP 项目 ID。如果设置为
None
或缺失,则使用 GCP 连接中的默认 project_id。
- 返回:
Dataflow 作业响应
- 返回类型:
- launch_beam_yaml_job(*, job_name, yaml_pipeline_file, append_job_name, jinja_variables, options, project_id, location=DEFAULT_DATAFLOW_LOCATION)[源代码]¶
启动一个 Dataflow YAML 作业并运行直到完成。
- 参数:
job_name (str) – 分配给 Cloud Dataflow 作业的唯一名称。
yaml_pipeline_file (str) – 定义要运行的 YAML pipeline 的文件路径。必须是本地文件或以 ‘gs://’ 开头的 URL。
append_job_name (bool) – 如果必须在 job_name 后附加唯一后缀,则设置为 True。
jinja_variables (dict[str, str] | None) – 用于具体化 yaml pipeline 文件的 Jinja2 变量字典。
options (dict[str, Any] | None) – 额外的 gcloud 或 Beam 作业参数。它必须是一个字典,其键与 gcloud 中的可选标志名称匹配。支持的标志列表可以在以下链接找到:https://cloud.google.com/sdk/gcloud/reference/dataflow/yaml/run。请注意,如果某个标志不需要值,则其字典值必须为 True 或 None。例如,–log-http 标志可以作为 {‘log-http’: True} 传递。
project_id (str) – 拥有该作业的 GCP 项目 ID。
location (str) – 作业区域端点的区域 ID。默认为 ‘us-central1’。
on_new_job_callback – 一旦知道作业后,将作业传递给 operator 的回调函数。
- 返回:
作业 ID。
- 返回类型:
- is_job_dataflow_running(name, project_id, location=None, variables=None)[源代码]¶
检查 Dataflow 作业是否仍在运行。
- cancel_job(project_id, job_name=None, job_id=None, location=DEFAULT_DATAFLOW_LOCATION)[源代码]¶
使用指定的名称前缀或作业 ID 取消作业。
name
和job_id
参数是互斥的。
- start_sql_job(job_name, query, options, project_id, location=DEFAULT_DATAFLOW_LOCATION, on_new_job_id_callback=None, on_new_job_callback=None)[source]¶
启动 Dataflow SQL 查询。
- 参数:
job_name (str) – 分配给 Cloud Dataflow 作业的唯一名称。
query (str) – 要执行的 SQL 查询。
options (dict[str, Any]) – 要执行的作业参数。更多信息,请参考:https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query 命令参考
location (str) – Dataflow 作业的位置(例如 europe-west1)
project_id (str) – 拥有该作业的 GCP 项目 ID。如果设置为
None
或缺失,则使用 GCP 连接中的默认 project_id。on_new_job_id_callback (Callable[[str], None] | None) – (已弃用)当作业 ID 已知时调用的回调函数。
on_new_job_callback (Callable[[dict], None] | None) – 当作业已知时调用的回调函数。
- 返回:
新的作业对象
- get_job(job_id, project_id=PROVIDE_PROJECT_ID, location=DEFAULT_DATAFLOW_LOCATION)[source]¶
获取指定作业 ID 的作业。
- 参数:
job_id (str) – 要获取的作业 ID。
project_id (str) – 可选,用于启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。
location (str) – Dataflow 作业的位置(例如 europe-west1)。请参考:https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
- 返回:
作业
- 返回类型:
- fetch_job_metrics_by_id(job_id, project_id, location=DEFAULT_DATAFLOW_LOCATION)[source]¶
获取指定作业 ID 的作业指标。
- 参数:
job_id (str) – 要获取的作业 ID。
project_id (str) – 可选,用于启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。
location (str) – Dataflow 作业的位置(例如 europe-west1)。请参考:https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
- 返回:
JobMetrics。请参考:https://cloud.google.com/dataflow/docs/reference/rest/v1b3/JobMetrics
- 返回类型:
- fetch_job_messages_by_id(job_id, project_id, location=DEFAULT_DATAFLOW_LOCATION)[source]¶
获取指定作业 ID 的作业消息。
- fetch_job_autoscaling_events_by_id(job_id, project_id, location=DEFAULT_DATAFLOW_LOCATION)[source]¶
获取指定作业 ID 的作业自动扩缩事件。
- wait_for_done(job_name, location, project_id, job_id=None, multiple_jobs=False)[source]¶
等待 Dataflow 作业完成。
- 参数:
job_name (str) – 执行 DataFlow 作业时使用的“jobName”(模板化)。这将最终在管道选项中设置,因此
options
中任何键为'jobName'
的条目都将被覆盖。location (str) – 作业运行的位置
project_id (str) – 可选,用于启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。
job_id (str | None) – Dataflow 作业 ID
multiple_jobs (bool) – 如果管道创建多个作业,则监控所有作业
- create_data_pipeline(body, project_id, location=DEFAULT_DATAFLOW_LOCATION)[source]¶
创建一个新的 Dataflow Data Pipelines 实例。
- 参数:
body (dict) – 请求体(包含 Pipeline 实例)。请参考:https://cloud.google.com/dataflow/docs/reference/data-pipelines/rest/v1/projects.locations.pipelines/create#request-body
project_id (str) – 拥有该作业的 GCP 项目 ID。
location (str) – Data Pipelines 实例的目标位置(例如 us-central1)。
以 JSON 格式返回创建的 Data Pipelines 实例。
- get_data_pipeline(pipeline_name, project_id, location=DEFAULT_DATAFLOW_LOCATION)[source]¶
检索 Dataflow Data Pipelines 实例。
- 参数:
以 JSON 格式返回创建的 Data Pipelines 实例。
- run_data_pipeline(pipeline_name, project_id, location=DEFAULT_DATAFLOW_LOCATION)[source]¶
运行 Dataflow Data Pipeline 实例。
- 参数:
以 JSON 格式返回创建的作业。
- class airflow.providers.google.cloud.hooks.dataflow.AsyncDataflowHook(**kwargs)[source]¶
基类:
airflow.providers.google.common.hooks.base_google.GoogleBaseAsyncHook
Dataflow 服务的异步 hook 类。
- async initialize_client(client_class)[source]¶
初始化给定类的对象。
此方法用于初始化异步客户端。由于 Dataflow 服务使用了大量类,因此决定以相同的方式初始化它们,并使用从 GoogleBaseHook 类方法接收到的凭据。 :param client_class: Google Cloud SDK 的类
- async get_job(job_id, project_id=PROVIDE_PROJECT_ID, job_view=JobView.JOB_VIEW_SUMMARY, location=DEFAULT_DATAFLOW_LOCATION)[source]¶
获取指定作业 ID 的作业。
- 参数:
job_id (str) – 要获取的作业 ID。
project_id (str) – 要在其中启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。
job_view (int) – 可选。JobView 对象,用于确定返回数据的表示形式
location (str) – 可选。Dataflow 作业的位置(例如 europe-west1)。请参考:https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
- async get_job_status(job_id, project_id=PROVIDE_PROJECT_ID, job_view=JobView.JOB_VIEW_SUMMARY, location=DEFAULT_DATAFLOW_LOCATION)[source]¶
获取指定作业 ID 的作业状态。
- 参数:
job_id (str) – 要获取的作业 ID。
project_id (str) – 要在其中启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。
job_view (int) – 可选。JobView 对象,用于确定返回数据的表示形式
location (str) – 可选。Dataflow 作业的位置(例如 europe-west1)。请参考:https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
- async list_jobs(jobs_filter=None, project_id=PROVIDE_PROJECT_ID, location=DEFAULT_DATAFLOW_LOCATION, page_size=None, page_token=None)[source]¶
列出作业。
- 参数:
jobs_filter (int | None) – 可选。此字段过滤并返回指定作业状态的作业。
project_id (str | None) – 可选。要启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。
location (str | None) – 可选。Dataflow 作业的位置(例如 europe-west1)。
page_size (int | None) – 可选。如果有很多作业,则最多将响应限制为此数量。
page_token (str | None) – 可选。将其设置为先前响应的“next_page_token”字段,以请求长列表中的其他结果。
- async list_job_messages(job_id, project_id=PROVIDE_PROJECT_ID, minimum_importance=JobMessageImportance.JOB_MESSAGE_BASIC, page_size=None, page_token=None, start_time=None, end_time=None, location=DEFAULT_DATAFLOW_LOCATION)[source]¶
从 MessagesV1Beta3AsyncClient 返回 ListJobMessagesAsyncPager 对象。
此方法封装了 MessagesV1Beta3AsyncClient 的一个类似方法。ListJobMessagesAsyncPager 可以迭代以提取与特定作业 ID 关联的消息。
有关更多详细信息,请参阅 MessagesV1Beta3AsyncClient 方法说明:https://cloud.google.com/python/docs/reference/dataflow/latest/google.cloud.dataflow_v1beta3.services.messages_v1_beta3.MessagesV1Beta3AsyncClient
- 参数:
job_id (str) – 要获取消息的 Dataflow 作业 ID。
project_id (str | None) – 可选。要启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。
minimum_importance (int) – 可选。过滤只获取重要性 >= level 的消息。有关更多详细信息,请参阅说明:https://cloud.google.com/python/docs/reference/dataflow/latest/google.cloud.dataflow_v1beta3.types.JobMessageImportance
page_size (int | None) – 可选。如果指定,则确定要返回的最大消息数。如果未指定,服务可能会选择适当的默认值,或返回任意大量结果。
page_token (str | None) – 可选。如果提供,这应该是先前调用返回的 next_page_token 的值。这将导致返回下一页结果。
start_time (google.protobuf.timestamp_pb2.Timestamp | None) – 可选。如果指定,则仅返回时间戳 >= start_time 的消息。默认值为作业创建时间(即消息的开始)。
end_time (google.protobuf.timestamp_pb2.Timestamp | None) – 可选。如果指定,则仅返回时间戳 < end_time 的消息。默认值为当前时间。
location (str | None) – 可选。包含由 job_id 指定的作业的[区域端点](https://cloud.google.com/dataflow/docs/concepts/regional-endpoints)。
- async get_job_metrics(job_id, project_id=PROVIDE_PROJECT_ID, start_time=None, location=DEFAULT_DATAFLOW_LOCATION)[source]¶
从 MetricsV1Beta3AsyncClient 返回 JobMetrics 对象。
此方法封装了 MetricsV1Beta3AsyncClient 的一个类似方法。
有关更多详细信息,请参阅 MetricsV1Beta3AsyncClient 方法说明:https://cloud.google.com/python/docs/reference/dataflow/latest/google.cloud.dataflow_v1beta3.services.metrics_v1_beta3.MetricsV1Beta3AsyncClient
- 参数:
job_id (str) – 要获取指标的 Dataflow 作业 ID。
project_id (str | None) – 可选。要启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。
start_time (google.protobuf.timestamp_pb2.Timestamp | None) – 可选。仅返回自此时间以来发生变化的指标数据。默认是返回关于作业所有指标的所有信息。
location (str | None) – 可选。包含由 job_id 指定的作业的[区域端点](https://cloud.google.com/dataflow/docs/concepts/regional-endpoints)。