Google Cloud Dataflow 运算符

Dataflow 是一项用于执行各种数据处理模式的托管服务。 这些管道是使用 Apache Beam 编程模型创建的,该模型允许批量和流式处理。

先决条件任务

要使用这些运算符,您必须执行以下几项操作

运行数据管道的方式

根据您的环境、源文件,有多种方法可以运行 Dataflow 管道

  • 非模板化管道:如果您有 Java 的 *.jar 文件或 Python 的 *.py 文件,开发人员可以在 Airflow 工作器上以本地进程的方式运行管道。 这也意味着必须在工作器上安装必要的系统依赖项。 对于 Java,工作器必须安装 JRE 运行时。 对于 Python,则为 Python 解释器。运行时版本必须与管道版本兼容。 这是启动管道的最快方法,但由于其系统依赖项的频繁问题,可能会导致问题。 有关更多详细信息,请参阅: Java SDK 管道, Python SDK 管道。 开发人员还可以通过 JSON 格式传递管道的结构,然后运行它来创建作业。 有关更多详细信息,请参阅: JSON 格式的管道JSON 格式的管道

  • 模板化管道:程序员可以通过准备一个模板来使管道独立于环境,然后该模板将在 Google 管理的机器上运行。 这样,对环境的更改就不会影响您的管道。 模板有两种类型

    • 经典模板。 开发人员运行管道并创建模板。 Apache Beam SDK 在 Cloud Storage 中暂存文件,创建一个模板文件(类似于作业请求),并将模板文件保存在 Cloud Storage 中。 请参阅: 模板化作业

    • Flex 模板。 开发人员将管道打包到 Docker 映像中,然后使用 gcloud 命令行工具在 Cloud Storage 中构建和保存 Flex 模板规范文件。 请参阅: 模板化作业

  • SQL 管道:开发人员可以将管道编写为 SQL 语句,然后在 Dataflow 中执行。 请参阅: Dataflow SQL

最好使用非模板化管道测试您的管道,然后使用模板在生产环境中运行管道。

有关管道类型之间差异的详细信息,请参阅 Google Cloud 文档中的 Dataflow 模板

启动非模板化管道

JSON 格式的管道

可以通过 JSON 格式传递管道的结构来创建新的管道。 请参阅 DataflowCreatePipelineOperator 这将创建一个新的管道,该管道将在 Dataflow Pipelines UI 上可见。

以下是如何通过运行 DataflowCreatePipelineOperator 创建 Dataflow 管道的示例

tests/system/google/cloud/dataflow/example_dataflow_pipeline.py[来源]

create_pipeline = DataflowCreatePipelineOperator(
    task_id="create_pipeline",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    body={
        "name": f"projects/{GCP_PROJECT_ID}/locations/{GCP_LOCATION}/pipelines/{PIPELINE_NAME}",
        "type": PIPELINE_TYPE,
        "workload": {
            "dataflowFlexTemplateRequest": {
                "launchParameter": {
                    "containerSpecGcsPath": GCS_PATH,
                    "jobName": PIPELINE_JOB_NAME,
                    "environment": {"tempLocation": TEMP_LOCATION},
                    "parameters": {
                        "inputFile": INPUT_FILE,
                        "output": OUTPUT,
                    },
                },
                "projectId": GCP_PROJECT_ID,
                "location": GCP_LOCATION,
            }
        },
    },
)

要运行新创建的管道,您可以使用 DataflowRunPipelineOperator

tests/system/google/cloud/dataflow/example_dataflow_pipeline.py[来源]

run_pipeline = DataflowRunPipelineOperator(
    task_id="run_pipeline",
    pipeline_name=PIPELINE_NAME,
    project_id=GCP_PROJECT_ID,
)

调用后,DataflowRunPipelineOperator 将返回通过运行给定管道创建的 Google Cloud Dataflow 作业

有关 API 用法的更多信息,请参阅 Google Cloud 文档中的 Data Pipelines API REST 资源

要使用源文件(Java 中的 JAR 或 Python 文件)创建新管道,请使用创建作业运算符。 源文件可以位于 GCS 上或本地文件系统上。 BeamRunJavaPipelineOperatorBeamRunPythonPipelineOperator

Java SDK 管道

对于 Java 管道,必须为 BeamRunJavaPipelineOperator 指定 jar 参数,因为它包含要在 Dataflow 上执行的管道。 JAR 可以位于 Airflow 可以下载的 GCS 上,也可以位于本地文件系统上(提供其绝对路径)。

以下是使用存储在 GCS 上的 jar 在 Java 中创建和运行管道的示例

tests/system/google/cloud/dataflow/example_dataflow_native_java.py[来源]

start_java_job = BeamRunJavaPipelineOperator(
    runner=BeamRunnerType.DataflowRunner,
    task_id="start_java_job",
    jar=GCS_JAR,
    pipeline_options={
        "output": GCS_OUTPUT,
    },
    job_class="org.apache.beam.examples.WordCount",
    dataflow_config={
        "job_name": f"java-pipeline-job-{ENV_ID}",
        "check_if_running": CheckJobRunning.IgnoreJob,
        "location": LOCATION,
        "poll_sleep": 10,
        "append_job_name": False,
    },
)

以下是在可延迟模式下使用存储在 GCS 上的 jar 在 Java 中创建和运行管道的示例

tests/system/google/cloud/dataflow/example_dataflow_native_java.py[来源]

start_java_deferrable = BeamRunJavaPipelineOperator(
    runner=BeamRunnerType.DataflowRunner,
    task_id="start_java_job_deferrable",
    jar=GCS_JAR,
    pipeline_options={
        "output": GCS_OUTPUT,
    },
    job_class="org.apache.beam.examples.WordCount",
    dataflow_config={
        "job_name": f"deferrable-java-pipeline-job-{ENV_ID}",
        "check_if_running": CheckJobRunning.WaitForRun,
        "location": LOCATION,
        "poll_sleep": 10,
        "append_job_name": False,
    },
    deferrable=True,
)

以下是使用存储在本地文件系统上的 jar 在 Java 中创建和运行管道的示例

tests/system/google/cloud/dataflow/example_dataflow_native_java.py[来源]

start_java_job_local = BeamRunJavaPipelineOperator(
    task_id="start_java_job_local",
    jar=LOCAL_JAR,
    pipeline_options={
        "output": GCS_OUTPUT,
    },
    job_class="org.apache.beam.examples.WordCount",
    dataflow_config={
        "check_if_running": CheckJobRunning.WaitForRun,
        "location": LOCATION,
        "poll_sleep": 10,
    },
)

Python SDK 管道

必须为 BeamRunPythonPipelineOperator 指定 py_file 参数,因为它包含要在 Dataflow 上执行的管道。 Python 文件可以位于 Airflow 可以下载的 GCS 上,也可以位于本地文件系统上(提供其绝对路径)。

py_interpreter 参数指定执行管道时要使用的 Python 版本,默认为 python3。如果您的 Airflow 实例在 Python 2 上运行,请指定 python2 并确保您的 py_file 在 Python 2 中。 为了获得最佳结果,请使用 Python 3。

如果指定了 py_requirements 参数,将创建一个带有指定要求的临时 Python 虚拟环境,并且将在其中运行管道。

py_system_site_packages 参数指定是否可以从您的 Airflow 实例访问虚拟环境中的所有 Python 包(如果指定了 py_requirements 参数),建议避免,除非 Dataflow 作业需要它。

tests/system/google/cloud/dataflow/example_dataflow_native_python.py[来源]

start_python_job = BeamRunPythonPipelineOperator(
    runner=BeamRunnerType.DataflowRunner,
    task_id="start_python_job",
    py_file=GCS_PYTHON_SCRIPT,
    py_options=[],
    pipeline_options={
        "output": GCS_OUTPUT,
    },
    py_requirements=["apache-beam[gcp]==2.59.0"],
    py_interpreter="python3",
    py_system_site_packages=False,
    dataflow_config={"location": LOCATION, "job_name": "start_python_job"},
)

执行模型

Dataflow 具有多种执行管道的选项。 它可以在以下模式下完成:批处理异步(触发后忘记)、批处理阻塞(等待完成)或流式处理(无限期运行)。 在 Airflow 中,最佳实践是使用异步批处理管道或流,并使用传感器来侦听预期的作业状态。

默认情况下,BeamRunJavaPipelineOperatorBeamRunPythonPipelineOperatorDataflowTemplatedJobStartOperatorDataflowStartFlexTemplateOperator 的参数 wait_until_finished 设置为 None,这会导致不同的行为取决于管道的类型

  • 对于流式管道,等待作业启动,

  • 对于批处理管道,等待作业完成。

如果 wait_until_finished 设置为 True,操作符将始终等待管道执行结束。如果设置为 False,则仅提交作业。

参见:在 Cloud Dataflow 服务上执行的 PipelineOptions 配置

异步执行

Dataflow 批处理作业默认是异步的;但是,这取决于应用程序代码(包含在 JAR 或 Python 文件中)以及它的编写方式。为了使 Dataflow 作业异步执行,请确保不等待管道对象(不在应用程序代码中的 PipelineResult 上调用 waitUntilFinishwait_until_finish)。

tests/system/google/cloud/dataflow/example_dataflow_native_python_async.py[源代码]

start_python_job_async = BeamRunPythonPipelineOperator(
    task_id="start_python_job_async",
    runner=BeamRunnerType.DataflowRunner,
    py_file=GCS_PYTHON_SCRIPT,
    py_options=[],
    pipeline_options={
        "output": GCS_OUTPUT,
    },
    py_requirements=["apache-beam[gcp]==2.59.0"],
    py_interpreter="python3",
    py_system_site_packages=False,
    dataflow_config={
        "job_name": "start_python_job_async",
        "location": LOCATION,
        "wait_until_finished": False,
    },
)

阻塞执行

为了使 Dataflow 作业执行并等待完成,请确保在应用程序代码中等待管道对象。对于 Java SDK,可以通过在从 pipeline.run() 返回的 PipelineResult 上调用 waitUntilFinish 来实现,对于 Python SDK,可以通过在从 pipeline.run() 返回的 PipelineResult 上调用 wait_until_finish 来实现。

应该避免阻塞作业,因为在 Airflow 上运行时会发生后台进程。此进程会持续运行以等待 Dataflow 作业完成,并且会增加 Airflow 在此过程中对资源的消耗。

流式执行

要执行流式 Dataflow 作业,请确保设置了流式选项(对于 Python),或者在您的管道中从无界数据源(例如 Pub/Sub)读取数据(对于 Java)。

tests/system/google/cloud/dataflow/example_dataflow_streaming_python.py[源代码]

start_streaming_python_job = BeamRunPythonPipelineOperator(
    runner=BeamRunnerType.DataflowRunner,
    task_id="start_streaming_python_job",
    py_file=GCS_PYTHON_SCRIPT,
    py_options=[],
    pipeline_options={
        "temp_location": GCS_TMP,
        "input_topic": "projects/pubsub-public-data/topics/taxirides-realtime",
        "output_topic": f"projects/{PROJECT_ID}/topics/{TOPIC_ID}",
        "streaming": True,
    },
    py_requirements=["apache-beam[gcp]==2.59.0"],
    py_interpreter="python3",
    py_system_site_packages=False,
    dataflow_config={"location": LOCATION, "job_name": "start_python_job_streaming"},
)

将参数 drain_pipeline 设置为 True 允许通过排空而不是在杀死任务实例期间取消来停止流式作业。

参见:停止正在运行的管道

模板化作业

模板提供了在 Cloud Storage 上暂存管道并从那里运行它的能力。这在开发工作流程中提供了灵活性,因为它将管道的开发与暂存和执行步骤分离开来。Dataflow 有两种类型的模板:经典模板和灵活模板。有关更多信息,请参阅 Dataflow 模板的官方文档

以下是使用经典模板和 DataflowTemplatedJobStartOperator 运行 Dataflow 作业的示例

tests/system/google/cloud/dataflow/example_dataflow_template.py[源代码]

start_template_job = DataflowTemplatedJobStartOperator(
    task_id="start_template_job",
    project_id=PROJECT_ID,
    template="gs://dataflow-templates/latest/Word_Count",
    parameters={"inputFile": f"gs://{BUCKET_NAME}/{CSV_FILE_NAME}", "output": GCS_OUTPUT},
    location=LOCATION,
    wait_until_finished=True,
)

对于此操作,您还可以使用可延迟模式的操作符

tests/system/google/cloud/dataflow/example_dataflow_template.py[源代码]

start_template_job_deferrable = DataflowTemplatedJobStartOperator(
    task_id="start_template_job_deferrable",
    project_id=PROJECT_ID,
    template="gs://dataflow-templates/latest/Word_Count",
    parameters={"inputFile": f"gs://{BUCKET_NAME}/{CSV_FILE_NAME}", "output": GCS_OUTPUT},
    location=LOCATION,
    deferrable=True,
)

请参阅 可与此操作符一起使用的 Google 提供的模板列表

以下是使用灵活模板和 DataflowStartFlexTemplateOperator 运行 Dataflow 作业的示例

tests/system/google/cloud/dataflow/example_dataflow_template.py[源代码]

start_flex_template_job = DataflowStartFlexTemplateOperator(
    task_id="start_flex_template_job",
    project_id=PROJECT_ID,
    body=BODY,
    location=LOCATION,
    append_job_name=False,
    wait_until_finished=True,
)

对于此操作,您还可以使用可延迟模式的操作符

tests/system/google/cloud/dataflow/example_dataflow_template.py[源代码]

start_flex_template_job_deferrable = DataflowStartFlexTemplateOperator(
    task_id="start_flex_template_job_deferrable",
    project_id=PROJECT_ID,
    body=BODY,
    location=LOCATION,
    append_job_name=False,
    deferrable=True,
)

Dataflow SQL

Dataflow SQL 支持 ZetaSQL 查询语法的变体,并包括用于运行 Dataflow 流式作业的其他流式扩展。

以下是使用 DataflowStartSqlJobOperator 运行 Dataflow SQL 作业的示例

tests/system/google/cloud/dataflow/example_dataflow_sql.py[源代码]

start_sql = DataflowStartSqlJobOperator(
    task_id="start_sql_query",
    job_name=DATAFLOW_SQL_JOB_NAME,
    query=f"""
        SELECT
            emp_name as employee,
            salary as employee_salary
        FROM
            bigquery.table.`{PROJECT_ID}`.`{BQ_SQL_DATASET}`.`{BQ_SQL_TABLE_INPUT}`
        WHERE salary >= 1000;
    """,
    options={
        "bigquery-project": PROJECT_ID,
        "bigquery-dataset": BQ_SQL_DATASET,
        "bigquery-table": BQ_SQL_TABLE_OUTPUT,
    },
    location=LOCATION,
    do_xcom_push=True,
)

警告

此操作符需要 Airflow 工作节点上安装 gcloud 命令(Google Cloud SDK) <https://cloud.google.com/sdk/docs/install>`__

请参阅 Dataflow SQL 参考

Dataflow YAML

Beam YAML 是一个无代码 SDK,用于通过使用 YAML 文件配置 Apache Beam 管道。您可以使用 Beam YAML 来编写和运行 Beam 管道,而无需编写任何代码。此 API 可用于定义流式和批处理管道。

以下是使用 DataflowStartYamlJobOperator 运行 Dataflow YAML 作业的示例

tests/system/google/cloud/dataflow/example_dataflow_yaml.py[源代码]

start_dataflow_yaml_job = DataflowStartYamlJobOperator(
    task_id="start_dataflow_yaml_job",
    job_name=DATAFLOW_YAML_JOB_NAME,
    yaml_pipeline_file=DATAFLOW_YAML_PIPELINE_FILE_URL,
    append_job_name=True,
    deferrable=False,
    region=REGION,
    project_id=PROJECT_ID,
    jinja_variables=BQ_VARIABLES,
)

可以通过传递 deferrable=True 作为参数,以可延迟模式运行此操作符。

tests/system/google/cloud/dataflow/example_dataflow_yaml.py[源代码]

start_dataflow_yaml_job_def = DataflowStartYamlJobOperator(
    task_id="start_dataflow_yaml_job_def",
    job_name=DATAFLOW_YAML_JOB_NAME,
    yaml_pipeline_file=DATAFLOW_YAML_PIPELINE_FILE_URL,
    append_job_name=True,
    deferrable=True,
    region=REGION,
    project_id=PROJECT_ID,
    jinja_variables=BQ_VARIABLES_DEF,
    expected_terminal_state=DataflowJobStatus.JOB_STATE_DONE,
)

警告

此操作符需要 Airflow 工作节点上安装 gcloud 命令(Google Cloud SDK) <https://cloud.google.com/sdk/docs/install>`__

请参阅 Dataflow YAML 参考

停止管道

要停止一个或多个 Dataflow 管道,可以使用 DataflowStopJobOperator。默认情况下,会排空流式管道,将 drain_pipeline 设置为 False 将改为取消它们。提供 job_id 以停止特定作业,或提供 job_name_prefix 以停止所有具有指定名称前缀的作业。

tests/system/google/cloud/dataflow/example_dataflow_native_python.py[来源]

stop_dataflow_job = DataflowStopJobOperator(
    task_id="stop_dataflow_job",
    location=LOCATION,
    job_name_prefix="start-python-pipeline",
)

参见:停止正在运行的管道

删除管道

要删除 Dataflow 管道,可以使用 DataflowDeletePipelineOperator。以下是如何使用此操作符的示例

tests/system/google/cloud/dataflow/example_dataflow_pipeline.py[来源]

delete_pipeline = DataflowDeletePipelineOperator(
    task_id="delete_pipeline",
    pipeline_name=PIPELINE_NAME,
    project_id=GCP_PROJECT_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

传感器

当异步触发作业时,可以使用传感器来运行特定作业属性的检查。

DataflowJobStatusSensor.

tests/system/google/cloud/dataflow/example_dataflow_native_python_async.py[源代码]

wait_for_python_job_async_done = DataflowJobStatusSensor(
    task_id="wait_for_python_job_async_done",
    job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_id']}}",
    expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
    location=LOCATION,
)

可以通过传递 deferrable=True 作为参数,以可延迟模式运行此操作符。

tests/system/google/cloud/dataflow/example_dataflow_sensors_deferrable.py[源代码]

wait_for_beam_python_pipeline_job_status_def = DataflowJobStatusSensor(
    task_id="wait_for_beam_python_pipeline_job_status_def",
    job_id="{{task_instance.xcom_pull('start_beam_python_pipeline')['dataflow_job_id']}}",
    expected_statuses=DataflowJobStatus.JOB_STATE_DONE,
    location=LOCATION,
    deferrable=True,
)

DataflowJobMetricsSensor.

tests/system/google/cloud/dataflow/example_dataflow_native_python_async.py[源代码]

def check_metric_scalar_gte(metric_name: str, value: int) -> Callable:
    """Check is metric greater than equals to given value."""

    def callback(metrics: list[dict]) -> bool:
        dag.log.info("Looking for '%s' >= %d", metric_name, value)
        for metric in metrics:
            context = metric.get("name", {}).get("context", {})
            original_name = context.get("original_name", "")
            tentative = context.get("tentative", "")
            if original_name == "Service-cpu_num_seconds" and not tentative:
                return metric["scalar"] >= value
        raise AirflowException(f"Metric '{metric_name}' not found in metrics")

    return callback

wait_for_python_job_async_metric = DataflowJobMetricsSensor(
    task_id="wait_for_python_job_async_metric",
    job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_id']}}",
    location=LOCATION,
    callback=check_metric_scalar_gte(metric_name="Service-cpu_num_seconds", value=100),
    fail_on_terminal_state=False,
)

可以通过传递 deferrable=True 作为参数,以可延迟模式运行此操作符。

tests/system/google/cloud/dataflow/example_dataflow_sensors_deferrable.py[源代码]

def check_metric_scalar_gte(metric_name: str, value: int) -> Callable:
    """Check is metric greater than equals to given value."""

    def callback(metrics: list[dict]) -> bool:
        dag.log.info("Looking for '%s' >= %d", metric_name, value)
        for metric in metrics:
            context = metric.get("name", {}).get("context", {})
            original_name = context.get("original_name", "")
            tentative = context.get("tentative", "")
            if original_name == "Service-cpu_num_seconds" and not tentative:
                return metric["scalar"] >= value
        raise AirflowException(f"Metric '{metric_name}' not found in metrics")

    return callback

wait_for_beam_python_pipeline_job_metric_def = DataflowJobMetricsSensor(
    task_id="wait_for_beam_python_pipeline_job_metric_def",
    job_id="{{task_instance.xcom_pull('start_beam_python_pipeline')['dataflow_job_id']}}",
    location=LOCATION,
    callback=check_metric_scalar_gte(metric_name="Service-cpu_num_seconds", value=100),
    fail_on_terminal_state=False,
    deferrable=True,
)

DataflowJobMessagesSensor.

tests/system/google/cloud/dataflow/example_dataflow_native_python_async.py[源代码]

def check_message(messages: list[dict]) -> bool:
    """Check message"""
    for message in messages:
        if "Adding workflow start and stop steps." in message.get("messageText", ""):
            return True
    return False

wait_for_python_job_async_message = DataflowJobMessagesSensor(
    task_id="wait_for_python_job_async_message",
    job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_id']}}",
    location=LOCATION,
    callback=check_message,
    fail_on_terminal_state=False,
)

可以通过传递 deferrable=True 作为参数,以可延迟模式运行此操作符。

tests/system/google/cloud/dataflow/example_dataflow_sensors_deferrable.py[源代码]

def check_job_message(messages: list[dict]) -> bool:
    """Check job message."""
    for message in messages:
        if "Adding workflow start and stop steps." in message.get("messageText", ""):
            return True
    return False

wait_for_beam_python_pipeline_job_message_def = DataflowJobMessagesSensor(
    task_id="wait_for_beam_python_pipeline_job_message_def",
    job_id="{{task_instance.xcom_pull('start_beam_python_pipeline')['dataflow_job_id']}}",
    location=LOCATION,
    callback=check_job_message,
    fail_on_terminal_state=False,
    deferrable=True,
)

DataflowJobAutoScalingEventsSensor.

tests/system/google/cloud/dataflow/example_dataflow_native_python_async.py[源代码]

def check_autoscaling_event(autoscaling_events: list[dict]) -> bool:
    """Check autoscaling event"""
    for autoscaling_event in autoscaling_events:
        if "Worker pool started." in autoscaling_event.get("description", {}).get("messageText", ""):
            return True
    return False

wait_for_python_job_async_autoscaling_event = DataflowJobAutoScalingEventsSensor(
    task_id="wait_for_python_job_async_autoscaling_event",
    job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_id']}}",
    location=LOCATION,
    callback=check_autoscaling_event,
    fail_on_terminal_state=False,
)

可以通过传递 deferrable=True 作为参数,以可延迟模式运行此操作符。

tests/system/google/cloud/dataflow/example_dataflow_sensors_deferrable.py[源代码]

def check_autoscaling_event(autoscaling_events: list[dict]) -> bool:
    """Check autoscaling event."""
    for autoscaling_event in autoscaling_events:
        if "Worker pool started." in autoscaling_event.get("description", {}).get("messageText", ""):
            return True
    return False

wait_for_beam_python_pipeline_job_autoscaling_event_def = DataflowJobAutoScalingEventsSensor(
    task_id="wait_for_beam_python_pipeline_job_autoscaling_event_def",
    job_id="{{task_instance.xcom_pull('start_beam_python_pipeline')['dataflow_job_id']}}",
    location=LOCATION,
    callback=check_autoscaling_event,
    fail_on_terminal_state=False,
    deferrable=True,
)

此条目是否有帮助?