airflow.providers.google.cloud.operators.dataflow

此模块包含 Google Dataflow 运算符。

模块内容

CheckJobRunning

用于选择如果作业已经在运行时执行什么操作的辅助枚举。

DataflowConfiguration

用于 BeamRunJavaPipelineOperator 和 BeamRunPythonPipelineOperator 的 Dataflow 配置。

DataflowTemplatedJobStartOperator

使用经典模板启动 Dataflow 作业;操作的参数将传递给作业。

DataflowStartFlexTemplateOperator

使用 Flex 模板启动 Dataflow 作业。

DataflowStartSqlJobOperator

启动 Dataflow SQL 查询。

DataflowStartYamlJobOperator

启动 Dataflow YAML 作业并返回结果。

DataflowStopJobOperator

停止具有指定名称前缀或作业 ID 的作业。

DataflowCreatePipelineOperator

创建新的 Dataflow 数据管道实例。

DataflowRunPipelineOperator

运行 Dataflow 数据管道。

DataflowDeletePipelineOperator

删除 Dataflow 数据管道。

class airflow.providers.google.cloud.operators.dataflow.CheckJobRunning[源代码]

基类: enum.Enum

用于选择如果作业已经在运行时执行什么操作的辅助枚举。

IgnoreJob - 不检查是否正在运行 FinishIfRunning - 完成当前 dag 运行,不执行任何操作 WaitForRun - 等待作业完成,然后继续新作业

IgnoreJob = 1[源代码]
FinishIfRunning = 2[源代码]
WaitForRun = 3[源代码]
class airflow.providers.google.cloud.operators.dataflow.DataflowConfiguration(*, job_name=None, append_job_name=True, project_id=PROVIDE_PROJECT_ID, location=DEFAULT_DATAFLOW_LOCATION, gcp_conn_id='google_cloud_default', poll_sleep=10, impersonation_chain=None, drain_pipeline=False, cancel_timeout=5 * 60, wait_until_finished=None, multiple_jobs=None, check_if_running=CheckJobRunning.WaitForRun, service_account=None)[源代码]

用于 BeamRunJavaPipelineOperator 和 BeamRunPythonPipelineOperator 的 Dataflow 配置。

参数
  • job_name (str | None) – 执行 Dataflow 作业时要使用的 'jobName' (已模板化)。这最终会设置在管道选项中,因此任何具有键 'jobName''job_name'``in ``options 的条目都将被覆盖。

  • append_job_name (bool) – 如果必须将唯一后缀附加到作业名称,则为 True。

  • project_id (str) – 可选,要在其中启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。

  • location (str | None) – 作业位置。

  • gcp_conn_id (str) – 用于连接到 Google Cloud 的连接 ID。

  • poll_sleep (int) – 当作业处于 JOB_STATE_RUNNING 状态时,在轮询 Google Cloud Platform 以获取 dataflow 作业状态之间休眠的时间(以秒为单位)。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) –

    可选服务帐户,用于使用短期凭据进行模拟,或获取列表中最后一个帐户的 access_token 所需的帐户的链式列表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予原始帐户服务帐户令牌创建者 IAM 角色。如果设置为序列,则列表中标识必须向直接前面的标识授予服务帐户令牌创建者 IAM 角色,列表中的第一个帐户向原始帐户授予此角色(已模板化)。

    警告

    此选项需要 Apache Beam 2.39.0 或更高版本。

  • drain_pipeline (bool) – 可选,如果希望通过在终止任务实例期间排空而不是取消来停止流式处理作业,请设置为 True。请参阅:https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline

  • cancel_timeout (int | None) – 当任务被终止时,运算符应等待管道成功取消的时间(以秒为单位)。(可选)默认为 300 秒

  • wait_until_finished (bool | None) –

    (可选)如果为 True,则在退出之前等待管道执行结束。如果为 False,则仅提交作业。如果为 None,则为默认行为。

    默认行为取决于管道的类型

    • 对于流式处理管道,请等待作业启动,

    • 对于批处理管道,请等待作业完成。

    警告

    您不能在管道代码中调用 PipelineResult.wait_until_finish 方法,以使运算符正常工作。即,您必须使用异步执行。否则,您的管道将始终等待完成。有关更多信息,请查看:异步执行

    在 Airflow 中启动 Dataflow 作业的过程包括两个步骤:* 运行子进程并读取作业 ID 的 stderr/stderr 日志。 * 循环等待上一步作业 ID 的结束,方法是检查其状态。

    步骤二在步骤一完成之后立即启动,因此如果您的管道代码中有 wait_until_finished,则步骤二将不会启动,直到该过程停止。当此过程停止时,步骤二将运行,但由于作业将处于终端状态,因此它只会执行一次迭代。

    如果在管道中没有调用 wait_for_pipeline 方法,但将 wait_until_finish=True 传递给运算符,则第二个循环将等待作业的终端状态。

    如果在管道中没有调用 wait_for_pipeline 方法,并将 wait_until_finish=False 传递给运算符,则第二个循环将检查一次,作业是否未处于终端状态并退出循环。

  • multiple_jobs (bool | None) – 如果管道创建多个作业,则监控所有作业。仅 BeamRunJavaPipelineOperator 支持此功能。

  • check_if_running (CheckJobRunning) – 在运行作业之前,验证之前的运行是否未在进行中。仅由以下项支持:BeamRunJavaPipelineOperator

  • service_account (str | None) – 将作业作为特定的服务帐户运行,而不是默认的 GCE 机器人。

template_fields: collections.abc.Sequence[str] = ('job_name', 'location')[source]
class airflow.providers.google.cloud.operators.dataflow.DataflowTemplatedJobStartOperator(*, template, project_id=PROVIDE_PROJECT_ID, job_name='{{task.task_id}}', options=None, dataflow_default_options=None, parameters=None, location=None, gcp_conn_id='google_cloud_default', poll_sleep=10, impersonation_chain=None, environment=None, cancel_timeout=10 * 60, wait_until_finished=None, append_job_name=True, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), expected_terminal_state=None, **kwargs)[source]

基类:airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

使用经典模板启动 Dataflow 作业;操作的参数将传递给作业。

另请参阅

有关如何使用此运算符的更多信息,请参阅指南:模板化作业

参数
  • template (str) – 对 Dataflow 模板的引用。

  • job_name (str) – 执行 Dataflow 模板时使用的“jobName”(已模板化)。

  • options (dict[str, Any] | None) –

    作业运行时环境选项的映射。如果传递,它将更新环境参数。

    另请参阅

    有关可能配置的更多信息,请查看 API 文档 https://cloud.google.com/dataflow/pipelines/specifying-exec-params

  • dataflow_default_options (dict[str, Any] | None) – 默认作业环境选项的映射。

  • parameters (dict[str, str] | None) – 模板的特定作业参数的映射。

  • project_id (str) – 可选,要在其中启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。

  • location (str | None) – 作业位置。

  • gcp_conn_id (str) – 用于连接到 Google Cloud 的连接 ID。

  • poll_sleep (int) – 当作业处于 JOB_STATE_RUNNING 状态时,在轮询 Google Cloud Platform 以获取 dataflow 作业状态之间休眠的时间(以秒为单位)。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,用于使用短期凭据模拟,或者用于获取列表中最后一个帐户的 access_token 的链式帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予原始帐户“服务帐户令牌创建者”IAM 角色。如果设置为序列,则列表中的身份必须将“服务帐户令牌创建者”IAM 角色授予紧邻的前一个身份,列表中的第一个帐户将此角色授予原始帐户(已模板化)。

  • environment (dict | None) –

    可选,作业运行时环境选项的映射。

    另请参阅

    有关可能配置的更多信息,请查看 API 文档 https://cloud.google.com/dataflow/pipelines/specifying-exec-params

  • cancel_timeout (int | None) – 当任务被终止时,运算符应等待管道成功取消的时间(以秒为单位)。

  • append_job_name (bool) – 如果必须将唯一后缀附加到作业名称,则为 True。

  • wait_until_finished (bool | None) –

    (可选)如果为 True,则在退出之前等待管道执行结束。如果为 False,则仅提交作业。如果为 None,则为默认行为。

    默认行为取决于管道的类型

    • 对于流式处理管道,请等待作业启动,

    • 对于批处理管道,请等待作业完成。

    警告

    您不能在管道代码中调用 PipelineResult.wait_until_finish 方法,以使运算符正常工作。即,您必须使用异步执行。否则,您的管道将始终等待完成。有关更多信息,请查看:异步执行

    在 Airflow 中启动 Dataflow 作业的过程包括两个步骤

    • 运行子进程并读取作业 ID 的 stderr/stderr 日志。

    • 循环等待上一步中作业 ID 的结束。此循环检查作业的状态。

    步骤二在步骤一完成之后立即启动,因此如果您的管道代码中有 wait_until_finished,则步骤二将不会启动,直到该过程停止。当此过程停止时,步骤二将运行,但由于作业将处于终端状态,因此它只会执行一次迭代。

    如果在管道中没有调用 wait_for_pipeline 方法,但将 wait_until_finish=True 传递给运算符,则第二个循环将等待作业的终端状态。

    如果在管道中没有调用 wait_for_pipeline 方法,并将 wait_until_finish=False 传递给运算符,则第二个循环将检查一次,作业是否未处于终端状态并退出循环。

  • expected_terminal_state (str | None) – 运算符的预期终端状态,对应于 Airflow 任务成功时的状态。如果未指定,将由 hook 确定。

在 dag 的 default_args 中定义 dataflow_* 参数(如项目、区域和暂存位置)是一种很好的做法。

default_args = {
    "dataflow_default_options": {
        "zone": "europe-west1-d",
        "tempLocation": "gs://my-staging-bucket/staging/",
    }
}

您需要使用 template 参数传递您的 dataflow 模板的路径作为文件引用。使用 parameters 将参数传递给您的作业。使用 environment 将运行时环境变量传递给您的作业。

t1 = DataflowTemplatedJobStartOperator(
    task_id="dataflow_example",
    template="{{var.value.gcp_dataflow_base}}",
    parameters={
        "inputFile": "gs://bucket/input/my_input.txt",
        "outputFile": "gs://bucket/output/my_output.txt",
    },
    gcp_conn_id="airflow-conn-id",
    dag=my_dag,
)

templatedataflow_default_optionsparametersjob_name 都是模板化的,因此您可以在其中使用变量。

请注意,dataflow_default_options 应该保存项目信息的高级选项,这些选项适用于 DAG 中的所有 dataflow 运算符。

参数

deferrable (bool) – 在可延迟模式下运行运算符。

template_fields: collections.abc.Sequence[str] = ('template', 'job_name', 'options', 'parameters', 'project_id', 'location', 'gcp_conn_id',...[source]
ui_color = '#0273d4'[source]
hook()[source]
execute(context)[source]

在创建操作符时派生。

上下文与渲染 Jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

execute_complete(context, event)[source]

在触发器完成其工作后执行。

on_kill()[source]

当任务实例被终止时,重写此方法以清理子进程。

在操作符内使用的任何 threading、subprocess 或 multiprocessing 模块都需要清理,否则会留下僵尸进程。

class airflow.providers.google.cloud.operators.dataflow.DataflowStartFlexTemplateOperator(body, location, project_id=PROVIDE_PROJECT_ID, gcp_conn_id='google_cloud_default', drain_pipeline=False, cancel_timeout=10 * 60, wait_until_finished=None, impersonation_chain=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), append_job_name=True, expected_terminal_state=None, poll_sleep=10, *args, **kwargs)[source]

基类:airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

使用 Flex 模板启动 Dataflow 作业。

另请参阅

有关如何使用此操作符的更多信息,请查看指南:模板作业

参数
  • body (dict) – 请求正文。请参阅:https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.locations.flexTemplates/launch#request-body

  • location (str) – Dataflow 作业的位置(例如 europe-west1)

  • project_id (str) – 拥有该作业的 GCP 项目的 ID。

  • gcp_conn_id (str) – 用于连接到 Google Cloud Platform 的连接 ID。

  • drain_pipeline (bool) – 可选,如果希望通过在终止任务实例期间排空而不是取消来停止流式处理作业,请设置为 True。请参阅:https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline

  • cancel_timeout (int | None) – 当任务被终止时,运算符应等待管道成功取消的时间(以秒为单位)。

  • wait_until_finished (bool | None) –

    (可选)如果为 True,则在退出之前等待管道执行结束。如果为 False,则仅提交作业。如果为 None,则为默认行为。

    默认行为取决于管道的类型

    • 对于流式处理管道,请等待作业启动,

    • 对于批处理管道,请等待作业完成。

    警告

    您不能在管道代码中调用 PipelineResult.wait_until_finish 方法,以使运算符正常工作。即,您必须使用异步执行。否则,您的管道将始终等待完成。有关更多信息,请查看:异步执行

    在 Airflow 中启动 Dataflow 作业的过程包括两个步骤

    • 运行子进程并读取作业 ID 的 stderr/stderr 日志。

    • 循环等待上一步中作业 ID 的结束。此循环检查作业的状态。

    步骤二在步骤一完成之后立即启动,因此如果您的管道代码中有 wait_until_finished,则步骤二将不会启动,直到该过程停止。当此过程停止时,步骤二将运行,但由于作业将处于终端状态,因此它只会执行一次迭代。

    如果您在管道中不调用 wait_for_pipeline 方法,而是将 wait_until_finished=True 传递给操作符,则第二个循环将等待作业的终端状态。

    如果您在管道中不调用 wait_for_pipeline 方法,并将 wait_until_finished=False 传递给操作符,则第二个循环将检查一次作业是否未处于终端状态并退出循环。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,用于使用短期凭据模拟,或者用于获取列表中最后一个帐户的 access_token 的链式帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予原始帐户“服务帐户令牌创建者”IAM 角色。如果设置为序列,则列表中的身份必须将“服务帐户令牌创建者”IAM 角色授予紧邻的前一个身份,列表中的第一个帐户将此角色授予原始帐户(已模板化)。

  • deferrable (bool) – 在可延迟模式下运行运算符。

  • expected_terminal_state (str | None) – 操作符的预期最终状态,当达到此状态时,相应的 Airflow 任务将成功。如果未指定,则将由 hook 确定。

  • append_job_name (bool) – 如果必须将唯一后缀附加到作业名称,则为 True。

  • poll_sleep (int) – 当作业处于 JOB_STATE_RUNNING 状态时,在轮询 Google Cloud Platform 以获取 dataflow 作业状态之间休眠的时间(以秒为单位)。

template_fields: collections.abc.Sequence[str] = ('body', 'location', 'project_id', 'gcp_conn_id')[source]
hook()[source]
execute(context)[source]

在创建操作符时派生。

上下文与渲染 Jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

execute_complete(context, event)[source]

在触发器完成其工作后执行。

on_kill()[source]

当任务实例被终止时,重写此方法以清理子进程。

在操作符内使用的任何 threading、subprocess 或 multiprocessing 模块都需要清理,否则会留下僵尸进程。

class airflow.providers.google.cloud.operators.dataflow.DataflowStartSqlJobOperator(job_name, query, options, location=DEFAULT_DATAFLOW_LOCATION, project_id=PROVIDE_PROJECT_ID, gcp_conn_id='google_cloud_default', drain_pipeline=False, impersonation_chain=None, *args, **kwargs)[source]

基类:airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

启动 Dataflow SQL 查询。

另请参阅

有关如何使用此操作符的更多信息,请查看指南:Dataflow SQL

警告

此操作符需要 Airflow 工作节点上安装 gcloud 命令(Google Cloud SDK) <https://cloud.google.com/sdk/docs/install>`__

参数
  • job_name (str) – 要分配给 Cloud Dataflow 作业的唯一名称。

  • query (str) – 要执行的 SQL 查询。

  • options (dict[str, Any]) –

    要执行的作业参数。它可以是一个包含以下键的字典。

    有关更多信息,请参阅:https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query 命令参考

  • location (str) – Dataflow 作业的位置(例如 europe-west1)

  • project_id (str) – 拥有该作业的 GCP 项目的 ID。如果设置为 None 或缺失,则使用 GCP 连接中的默认 project_id。

  • gcp_conn_id (str) – 用于连接到 Google Cloud Platform 的连接 ID。

  • drain_pipeline (bool) – 可选,如果希望通过在终止任务实例期间排空而不是取消来停止流式处理作业,请设置为 True。请参阅:https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,用于使用短期凭据模拟,或者用于获取列表中最后一个帐户的 access_token 的链式帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予原始帐户“服务帐户令牌创建者”IAM 角色。如果设置为序列,则列表中的身份必须将“服务帐户令牌创建者”IAM 角色授予紧邻的前一个身份,列表中的第一个帐户将此角色授予原始帐户(已模板化)。

template_fields: collections.abc.Sequence[str] = ('job_name', 'query', 'options', 'location', 'project_id', 'gcp_conn_id')[source]
template_fields_renderers[source]
execute(context)[source]

在创建操作符时派生。

上下文与渲染 Jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

on_kill()[source]

当任务实例被终止时,重写此方法以清理子进程。

在操作符内使用的任何 threading、subprocess 或 multiprocessing 模块都需要清理,否则会留下僵尸进程。

class airflow.providers.google.cloud.operators.dataflow.DataflowStartYamlJobOperator(*, job_name, yaml_pipeline_file, region=DEFAULT_DATAFLOW_LOCATION, project_id=PROVIDE_PROJECT_ID, gcp_conn_id='google_cloud_default', append_job_name=True, drain_pipeline=False, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), poll_sleep=10, cancel_timeout=5 * 60, expected_terminal_state=None, jinja_variables=None, options=None, impersonation_chain=None, **kwargs)[source]

基类:airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

启动 Dataflow YAML 作业并返回结果。

另请参阅

有关如何使用此操作符的更多信息,请查看指南: Dataflow YAML

警告

此操作符需要 Airflow 工作节点上安装 gcloud 命令(Google Cloud SDK) <https://cloud.google.com/sdk/docs/install>`__

参数
  • job_name (str) – 必需。要分配给 Cloud Dataflow 作业的唯一名称。

  • yaml_pipeline_file (str) – 必需。定义要运行的 YAML 管道的文件的路径。必须是本地文件或以“gs://”开头的 URL。

  • region (str) – 可选。作业的区域端点的区域 ID。默认为“us-central1”。

  • project_id (str) – 必需。拥有该作业的 GCP 项目的 ID。如果设置为 None 或缺失,则使用 GCP 连接中的默认 project_id。

  • gcp_conn_id (str) – 可选。用于连接到 GCP 的连接 ID。

  • append_job_name (bool) – 可选。如果必须将唯一后缀附加到 job_name,则设置为 True。默认为 True。

  • drain_pipeline (bool) – 可选。如果要在杀死任务实例时通过排空(而非取消)来停止流式管道作业,则设置为 True。请注意,这不适用于批处理管道作业或可延迟模式。默认为 False。有关更多信息,请参阅: https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline

  • deferrable (bool) – 可选。在可延迟模式下运行操作符。

  • expected_terminal_state (str | None) – 可选。Dataflow 作业的预期终端状态,操作符任务在该状态下设置为成功。对于批处理作业,默认为 “JOB_STATE_DONE”,对于流式作业,默认为 “JOB_STATE_RUNNING”。

  • poll_sleep (int) – 可选。轮询 Google Cloud Platform 获取 Dataflow 作业状态之间休眠的时间(以秒为单位)。用于同步和可延迟模式。

  • cancel_timeout (int | None) – 可选。当任务被杀死时,操作符应等待管道成功取消的时间(以秒为单位)。

  • jinja_variables (dict[str, str] | None) – 可选。用于实体化 yaml 管道文件的 Jinja2 变量字典。

  • options (dict[str, Any] | None) – 可选。其他 gcloud 或 Beam 作业参数。它必须是一个字典,其键与 gcloud 中的可选标志名称匹配。支持的标志列表可以在以下位置找到: https://cloud.google.com/sdk/gcloud/reference/dataflow/yaml/run 。请注意,如果标志不需要值,则其字典值必须为 True 或 None。例如, –log-http 标志可以作为 {'log-http': True} 传递。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,用于使用短期凭据模拟,或者用于获取列表中最后一个帐户的 access_token 的链式帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予原始帐户“服务帐户令牌创建者”IAM 角色。如果设置为序列,则列表中的身份必须将“服务帐户令牌创建者”IAM 角色授予紧邻的前一个身份,列表中的第一个帐户将此角色授予原始帐户(已模板化)。

Returns

包含作业数据的字典。

template_fields: collections.abc.Sequence[str] = ('job_name', 'yaml_pipeline_file', 'jinja_variables', 'options', 'region', 'project_id', 'gcp_conn_id')[source]
template_fields_renderers[source]
execute(context)[source]

在创建操作符时派生。

上下文与渲染 Jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

execute_complete(context, event)[source]

在触发器返回事件后执行。

on_kill()[source]

如果任务实例被杀死,则取消 dataflow 作业。

如果任务实例在延迟状态下被杀死,则不会调用此方法。

hook()[源代码]
class airflow.providers.google.cloud.operators.dataflow.DataflowStopJobOperator(job_name_prefix=None, job_id=None, project_id=PROVIDE_PROJECT_ID, location=DEFAULT_DATAFLOW_LOCATION, gcp_conn_id='google_cloud_default', poll_sleep=10, impersonation_chain=None, stop_timeout=10 * 60, drain_pipeline=True, **kwargs)[源代码]

基类:airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

停止具有指定名称前缀或作业 ID 的作业。

将停止所有具有提供名称前缀的作业。默认情况下,将耗尽流式作业。

参数 job_name_prefixjob_id 是互斥的。

另请参阅

有关停止管道的更多详细信息,请参阅:https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline

另请参阅

有关如何使用此操作符的更多信息,请查看指南:停止管道

参数
  • job_name_prefix (str | None) – 指定要停止的作业的名称前缀。

  • job_id (str | None) – 指定要停止的作业的作业 ID。

  • project_id (str) – 可选,要在其中启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。

  • location (str) – 可选,作业位置。如果设置为 None 或缺失,将使用“us-central1”。

  • gcp_conn_id (str) – 用于连接到 Google Cloud 的连接 ID。

  • poll_sleep (int) – 轮询 Google Cloud Platform 以获取 Dataflow 作业状态以确认其已停止的间隔时间(秒)。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,用于使用短期凭据模拟,或者用于获取列表中最后一个帐户的 access_token 的链式帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予原始帐户“服务帐户令牌创建者”IAM 角色。如果设置为序列,则列表中的身份必须将“服务帐户令牌创建者”IAM 角色授予紧邻的前一个身份,列表中的第一个帐户将此角色授予原始帐户(已模板化)。

  • drain_pipeline (bool) – 可选,如果想要通过取消而不是耗尽来停止流式作业,请设置为 False。请参阅:https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline

  • stop_timeout (int | None) – 等待成功取消/耗尽作业的时间(秒)。

template_fields = ['job_id', 'project_id', 'impersonation_chain'][源代码]
execute(context)[源代码]

在创建操作符时派生。

上下文与渲染 Jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

class airflow.providers.google.cloud.operators.dataflow.DataflowCreatePipelineOperator(*, body, project_id=PROVIDE_PROJECT_ID, location=DEFAULT_DATAFLOW_LOCATION, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[源代码]

基类:airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

创建新的 Dataflow 数据管道实例。

另请参阅

有关如何使用此操作符的更多信息,请查看指南:JSON 格式的管道

参数
  • body (dict) – 请求正文(包含 Pipeline 的实例)。请参阅:https://cloud.google.com/dataflow/docs/reference/data-pipelines/rest/v1/projects.locations.pipelines/create#request-body

  • project_id (str) – 拥有该作业的 GCP 项目的 ID。

  • location (str) – 将 Data Pipelines 实例定向到的位置(例如 us-central1)。

  • gcp_conn_id (str) – 用于连接到 Google Cloud Platform 的连接 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) –

    可选服务帐户,用于使用短期凭据进行模拟,或获取列表中最后一个帐户的 access_token 所需的帐户的链式列表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予原始帐户服务帐户令牌创建者 IAM 角色。如果设置为序列,则列表中标识必须向直接前面的标识授予服务帐户令牌创建者 IAM 角色,列表中的第一个帐户向原始帐户授予此角色(已模板化)。

    警告

    此选项需要 Apache Beam 2.39.0 或更高版本。

以 JSON 表示形式返回创建的 Dataflow Data Pipeline 实例。

execute(context)[源代码]

在创建操作符时派生。

上下文与渲染 Jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

class airflow.providers.google.cloud.operators.dataflow.DataflowRunPipelineOperator(pipeline_name, project_id=PROVIDE_PROJECT_ID, location=DEFAULT_DATAFLOW_LOCATION, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[源代码]

基类:airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

运行 Dataflow 数据管道。

另请参阅

有关如何使用此操作符的更多信息,请查看指南:JSON 格式的管道

参数
  • pipeline_name (str) – 管道的显示名称。例如,在 projects/PROJECT_ID/locations/LOCATION_ID/pipelines/PIPELINE_ID 中,它将是 PIPELINE_ID。

  • project_id (str) – 拥有该作业的 GCP 项目的 ID。

  • location (str) – 将 Data Pipelines 实例定向到的位置(例如 us-central1)。

  • gcp_conn_id (str) – 用于连接到 Google Cloud Platform 的连接 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,用于使用短期凭据模拟,或者用于获取列表中最后一个帐户的 access_token 的链式帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予原始帐户“服务帐户令牌创建者”IAM 角色。如果设置为序列,则列表中的身份必须将“服务帐户令牌创建者”IAM 角色授予紧邻的前一个身份,列表中的第一个帐户将此角色授予原始帐户(已模板化)。

以 JSON 表示形式返回创建的作业。

execute(context)[源代码]

在创建操作符时派生。

上下文与渲染 Jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

class airflow.providers.google.cloud.operators.dataflow.DataflowDeletePipelineOperator(pipeline_name, project_id=PROVIDE_PROJECT_ID, location=DEFAULT_DATAFLOW_LOCATION, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

基类:airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

删除 Dataflow 数据管道。

另请参阅

有关如何使用此操作符的更多信息,请查看指南:删除管道

参数
  • pipeline_name (str) – 管道的显示名称。例如,在 projects/PROJECT_ID/locations/LOCATION_ID/pipelines/PIPELINE_ID 中,它将是 PIPELINE_ID。

  • project_id (str) – 拥有该作业的 GCP 项目的 ID。

  • location (str) – 将 Data Pipelines 实例定向到的位置(例如 us-central1)。

  • gcp_conn_id (str) – 用于连接到 Google Cloud Platform 的连接 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,用于使用短期凭据模拟,或者用于获取列表中最后一个帐户的 access_token 的链式帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予原始帐户“服务帐户令牌创建者”IAM 角色。如果设置为序列,则列表中的身份必须将“服务帐户令牌创建者”IAM 角色授予紧邻的前一个身份,列表中的第一个帐户将此角色授予原始帐户(已模板化)。

execute(context)[source]

在创建操作符时派生。

上下文与渲染 Jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

此条目是否有帮助?